In [18]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer 
from pyspark.ml.feature import HashingTF, IDF,Word2Vec
from pyspark.sql.types import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.sql.functions import when, lit,udf,col
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.mllib.classification import LogisticRegressionWithSGD,LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors
import pandas as pd
import csv
from csv import reader,writer
import pickle 

In [19]:
spark = SparkSession.builder \
    .appName("Model1") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [20]:
data = pd.read_csv('/Users/anand/Downloads/UB/Sem2/DIc/A3/dic487-587/train.csv')
data_test = pd.read_csv('/Users/anand/Downloads/UB/Sem2/DIc/A3/dic487-587/test.csv')
df2 = spark.createDataFrame(data)
df3 = spark.createDataFrame(data_test)
mapping = pd.read_csv('/Users/anand/Downloads/UB/Sem2/DIc/A3/dic487-587/mapping.csv')
mapping = spark.createDataFrame(mapping)

In [21]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="words", pattern="\\W")
# stop words
stopwordsRemoverList = StopWordsRemover(inputCol="words", outputCol="filtered").loadDefaultStopWords("english")
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stopwordsRemoverList)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [30]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
idf = IDF(inputCol="rawFeatures", outputCol="featuresidf")

In [23]:
mg = mapping.select("0","Unnamed: 0").rdd.collectAsMap()
count = mapping.count()
def label_to_id(g):
    x=[]
    print(g)
    for i in g[1:-1].split(","):
        x.append(mg.get(i.strip()[1:-1]))
    x.sort()
    return x
udff=UserDefinedFunction(label_to_id,ArrayType(IntegerType()))

df2=df2.withColumn("lab",udff("genre"))
#df2.show()

+--------+--------------------+--------------------+--------------------+----------------+
|movie_id|          movie_name|                plot|               genre|             lab|
+--------+--------------------+--------------------+--------------------+----------------+
|23890098|          Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|          [0, 5]|
|31186339|    The Hunger Games|The nation of Pan...|['Action/Adventur...|  [0, 4, 10, 17]|
|20663735|          Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|      [0, 4, 16]|
| 2231378|  The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|             [1]|
|  595909|   A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|       [0, 5, 6]|
| 5272176|            End Game|The president is ...|['Action/Adventur...|   [0, 3, 4, 10]|
| 1952976|          Dark Water|{{plot}} The film...|['Thriller', 'Dra...|       [0, 3, 7]|
|24225279|                Sing|The story begins ...|           ['Drama']|             [0]|

In [24]:
#Converting mapped ids to one hot encoding 
def one_hot_list(g):
    y=1
    x=[]
    for i in range(0,count):
        x.append(0)
    for i in g:
        x[i]=1 
    return x
udff=UserDefinedFunction(one_hot_list,ArrayType(IntegerType()))
traindata = df2.withColumn("label",udff("lab"))
#traindata.show()

+--------+--------------------+--------------------+--------------------+----------------+--------------------+
|movie_id|          movie_name|                plot|               genre|             lab|               label|
+--------+--------------------+--------------------+--------------------+----------------+--------------------+
|23890098|          Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|          [0, 5]|[1, 0, 0, 0, 0, 1...|
|31186339|    The Hunger Games|The nation of Pan...|['Action/Adventur...|  [0, 4, 10, 17]|[1, 0, 0, 0, 1, 0...|
|20663735|          Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|      [0, 4, 16]|[1, 0, 0, 0, 1, 0...|
| 2231378|  The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|             [1]|[0, 1, 0, 0, 0, 0...|
|  595909|   A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|       [0, 5, 6]|[1, 0, 0, 0, 0, 1...|
| 5272176|            End Game|The president is ...|['Action/Adventur...|   [0, 3, 4, 10]|[1, 0, 0, 1, 1

In [33]:
#Model2
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover,countVectors, hashingTF,idf])

In [34]:
pipelineFit = pipeline.fit(df2)
pipelineFit2 = pipeline.fit(df3)

In [35]:
dataset = pipelineFit.transform(traindata)
testdataset = pipelineFit2.transform(df3)

In [36]:
pred_raw=[]
for i in range(0,count):
    def prep(line):
        return LabeledPoint(line.label[i], MLLibVectors.fromML(line.features))
    prepd_data = dataset.rdd.map(prep)
    model = LogisticRegressionWithLBFGS.train(prepd_data)
    with open('task2model'+str(i)+'.pkl','wb') as fid:
        pickle.dump(model, fid)
    labelsAndPreds = testdataset.rdd.map(lambda p: (p.movie_id, model.predict(MLLibVectors.fromML(p.features))))
    pred_raw.append(labelsAndPreds.collect())

In [37]:
column_name = ['movie_id', 'predictions']
data = pd.DataFrame(columns = column_name)

In [38]:
x=pred_raw
preds=[]
ids = []
for j in range(0,len(x[0])):
    preds=[]
    ids.append(str(x[0][j][0]))
    for i in range(0,count):
        preds.append(str(x[i][j][1]))
    temp = list(map(int, preds))
    #print(temp)
    temp2 =  ' '.join(map(str, temp))
    #print(int(ids[j]))
    temp = pd.DataFrame([[ids[j], temp2]], columns = column_name)
    data = data.append(temp)

In [39]:
data.to_csv('/Users/anand/Downloads/UB/Sem2/DIc/A3/result732.csv')