<a href="https://colab.research.google.com/github/LamDuyNg/Mining-Massive-Datasets-Final-Project/blob/main/task2_svd.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, size, to_date
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import col, udf, count, sum
from pyspark.sql.types import IntegerType, FloatType, ArrayType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import MultilayerPerceptronClassifier, RandomForestClassifier, LinearSVC, OneVsRest
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import DenseMatrix, Vectors
from pyspark.sql.functions import udf
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import shutil
import glob

In [4]:
class SVD:
  def __init__(self, spark, filePath):
    self.spark=spark
    self.filePath=filePath
    self.k=64

  def read_file(self):
    return self.spark.read.csv(self.filePath, header=False, inferSchema=True)

  def seperate_label(self,data):
    pixels = data.drop('_c0')  # Extract labels from the first column
    labels = data.select('_c0') # Extract pixel values from the remaining
    return labels, pixels

  def preprocess_data(self, data):
    assembler = VectorAssembler(inputCols=data.columns[1:], outputCol="features")
    assembled_data = assembler.transform(data)
    return assembled_data.select("features", "_c0").withColumnRenamed("_c0", "label")

  def export_cvs(self,data):
    data.coalesce(1).write.options(header='False', delimiter=',') \
              .csv("result")

    if(os.path.basename(self.filePath)=="cifar10-test-1k.csv"):
      # Move all CSV files from /content/result to /content/cifar10-test-svd.csv
      csv_files = glob.glob('result/*.csv')
      for csv_file in csv_files:
          shutil.move(csv_file, 'cifar10-test-svd.csv')
      # Remove the /content/result directory
      shutil.rmtree('result')

    if(os.path.basename(self.filePath)=="cifar10-train-5k.csv"):
      # Move all CSV files from /content/result to /content/cifar10-test-svd.csv
      csv_files = glob.glob('result/*.csv')
      for csv_file in csv_files:
          shutil.move(csv_file, 'cifar10-train-svd.csv')
      # Remove the /content/result directory
      shutil.rmtree('result')


  def run_svd(self):
    data=self.read_file()


    data=self.preprocess_data(data)
    ml_vectors = data.select("features").rdd.map(lambda row: Vectors.dense(row.features.toArray()))

    row=RowMatrix(ml_vectors)
    self.svd = row.computeSVD(self.k, True)

    self.U = self.svd.U #RowMatrix
    self.S = DenseMatrix(len(self.svd.s), len(self.svd.s), np.diag(self.svd.s).ravel("F")) #dense vector

    # Convert RowMatrix to DataFrame
    def row_to_dict(row):
      return Row(features=Vectors.dense(row.toArray()))

    row_matrix_rows = self.U.multiply(self.S).rows.map(row_to_dict)
    df = self.spark.createDataFrame(row_matrix_rows)

    # Define a UDF to convert a dense vector to an array of doubles
    vector_to_array = udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))

    # Apply the UDF to the 'features' column
    df = df.withColumn("features_array", vector_to_array(df["features"]))

    # Explode the array into multiple columns
    df = df.select([df["features_array"][i].alias(f"_c{i+1}") for i in range(self.k)])

    data = data.withColumn("unique_id", monotonically_increasing_id())
    df = df.withColumn("unique_id", monotonically_increasing_id())
    # Perform a join based on the unique identifier
    merged_df = data[['label','unique_id']].join(df, "unique_id").drop("unique_id")

    self.export_cvs(merged_df)

if __name__ == "__main__":
    spark = SparkSession.builder.appName("SVD").getOrCreate()


    train_file = "cifar10-train-5k.csv"
    svd=SVD(spark, train_file)
    svd.run_svd()

    test_file = "cifar10-test-1k.csv"
    svd=SVD(spark, test_file)
    svd.run_svd()

    spark.stop()