In [1]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,NGram,OneHotEncoderEstimator, VectorAssembler, PCA, StringIndexer
from pyspark.sql.functions import *
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.types import *
from nltk.stem.porter import *
from pyspark.ml import Pipeline 
from pyspark.sql.functions import col, size
import pyspark.sql.functions as F

import nltk
nltk.download('wordnet')

import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
#DIMENSIONALITY REDUCTION IN SVD
from sklearn.decomposition import TruncatedSVD
from scipy.sparse import csr_matrix
import numpy as np
import pandas as pd

#n is the number of components
n=450 

#Read preprocessed data
train_dt=spark.table("train_vector_assembled")
train_final_df =train_dt.select("features","label")

test_dt=spark.table("test_vector_assembled")
test_final_df =test_dt.select("features","label")


#Convert to appropriate format for sklearn
train_features_pd = train_final_df.toPandas()
train_features_series = train_features_pd['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
train_features = np.apply_along_axis(lambda x : x[0], 1, train_features_series)


test_features_pd = test_final_df.toPandas()
test_features_series = test_features_pd['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
test_features = np.apply_along_axis(lambda x : x[0], 1, test_features_series)

#Run SVD
svd = TruncatedSVD(n_components=n, n_iter=7, random_state=42)
train_pca_features_arr = svd.fit_transform(train_features)
test_pca_features_arr = svd.transform(test_features)



In [3]:
#Convert sklearn format back to Spark DataFrame
from pyspark.sql.types import StructType, StructField, LongType

def with_column_index(sdf): 
    new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
    return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)


  
#Returns a df to be passed to model for training, col: pca_features, variety_idx


assembler = VectorAssembler(
    inputCols=["pca[{0}]".format(i) for i in range(n)], 
    outputCol="pca_features")



def refactor_df(original_df, svd_output):
  svd_output = svd_output.astype('float64')
  
  svd_pd = pd.DataFrame(svd_output)
#   svd_pd['pca'] = svd_pd.values.tolist()
  svd_pd['pca'] = svd_pd.values.tolist()

  
  svd_df = spark.createDataFrame(svd_pd)
  svd_df = svd_df.select("*").alias('pca').select('pca')
  
  svd_df =assembler.transform(svd_df.select(
    "*", *(svd_df["pca"].getItem(i) for i in range(n))
  ))
#   svd_df.count()
#   svd_df = svd_df.select("pca")
  
  df1_ci = with_column_index(original_df)
  df2_ci = with_column_index(svd_df)
  new_df = df1_ci.join(df2_ci, df1_ci.ColumnIndex == df2_ci.ColumnIndex, 'inner').select("pca_features","label")
  
  return new_df



train_df_svd = refactor_df(train_final_df,train_pca_features_arr)
test_df_svd = refactor_df(test_final_df,test_pca_features_arr)




In [4]:
#Save the data processed with SVD to a table
train_df_svd.write.saveAsTable("train_svd_600_n")
test_df_svd.write.saveAsTable("test_svd_600_n")