# *Task 3 (N-Grams)*

In [1]:
# ### For Colab Only ###
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
# !tar xf spark-2.4.5-bin-hadoop2.7.tgz
# !pip install -q findspark

# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [2]:
# ### For Colab Only ###
# from pydrive.auth import GoogleAuth
# from pydrive.drive import GoogleDrive
# from google.colab import auth
# from oauth2client.client import GoogleCredentials

# auth.authenticate_user()
# gauth = GoogleAuth()
# gauth.credentials = GoogleCredentials.get_application_default()
# drive = GoogleDrive(gauth)

In [3]:
# ### For Colab Only ###
# trainFile = drive.CreateFile({'id':"1axe3asDoDNJGQlv-iA9wp9laPtybEGzX"})
# trainFile.GetContentFile('train.csv')
# testFile = drive.CreateFile({'id':"1521WHxxG3SJHiPNknvuMcjepyoVY4X39"})
# testFile.GetContentFile('test.csv')

In [4]:
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7') ## SPARK Initialization IN VM
import pyspark
from pyspark import SparkContext
sc = SparkContext()
from pyspark.sql import *
spark = SparkSession.builder.appName("Genre Prediction").getOrCreate()

In [5]:
import pandas as pd  
data = pd.read_csv("train.csv") 
# data.head()
test = pd.read_csv("test.csv") 
# test.head()

In [6]:
sqlCtx = SQLContext(sc)
df = spark.createDataFrame(data)
test_df = spark.createDataFrame(test)

In [7]:
df.printSchema()
df.show(1)
# df.show(1, truncate = False)

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- genre: string (nullable = true)

+--------+----------+--------------------+--------------------+
|movie_id|movie_name|                plot|               genre|
+--------+----------+--------------------+--------------------+
|23890098|Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
+--------+----------+--------------------+--------------------+
only showing top 1 row



In [8]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
from ast import literal_eval
import json

# UDF to parse array stored as string using JSON
def parse_array_from_string(x):
    l = []
    x = x.replace('[', '') 
    x = x.replace(']', '') 
    x = x.replace("'", '') 
    res = x.split(',')
    for word in res:
        l.append(word.strip())
    return l

retrieve_array = udf(parse_array_from_string, ArrayType(StringType()))

def lower_case(x):
    res = []
    for x_ in x:
        res.append(x_.lower())
    return res

convert_to_lower = udf(lower_case, ArrayType(StringType()))

df = df.withColumn("label", convert_to_lower(retrieve_array(col("genre"))))

In [9]:
test_df.printSchema()
df.printSchema()
df.show(1)
# df.show(1, truncate= False)

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- label: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------+----------+--------------------+--------------------+--------------------+
|movie_id|movie_name|                plot|               genre|               label|
+--------+----------+--------------------+--------------------+--------------------+
|23890098|Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[world cinema, dr...|
+--------+----------+--------------------+--------------------+--------------------+
only showing top 1 row



In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
#df1 = df
#word1 = 'work'
def makingLabelsForLabelx(val):
    for word in val:
        if word1 == word:
            return 1
    return 0
genres = ['drama','comedy','romance film','thriller','action','world cinema','crime fiction','horror','black-and-white','indie','action/adventure',
'adventure','family film','short film','romantic drama','animation','musical','science fiction','mystery','romantic comedy']

for word in genres:
    word1 = word
    labeling = udf(makingLabelsForLabelx, IntegerType())
    df = df.withColumn(word, labeling("label"))

In [11]:
df.show(1);

+--------+----------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|movie_id|movie_name|                plot|               genre|               label|drama|comedy|romance film|thriller|action|world cinema|crime fiction|horror|black-and-white|indie|action/adventure|adventure|family film|short film|romantic drama|animation|musical|science fiction|mystery|romantic comedy|
+--------+----------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|23890098|Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[world cinema, dr..

In [12]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import lower, col

# Clean text
df = df.withColumn("text", regexp_replace("plot", "[^a-zA-Z\\s]", ""))
test_df = test_df.withColumn("text", regexp_replace("plot", "[^a-zA-Z\\s]", ""))

# Tokenize text
tokenizer = Tokenizer(inputCol='text', outputCol='words_token')
df = tokenizer.transform(df)
test_df = tokenizer.transform(test_df)


# Remove stop words
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df = remover.transform(df)
test_df = remover.transform(test_df)


In [13]:
# Applying Ngrams on the clean words 
from pyspark.ml.feature import NGram
ngram = NGram(inputCol="words_clean", outputCol="ngrams")
ngramDataFrame = ngram.transform(df)
test_df = ngram.transform(test_df)

In [14]:
import pathlib
model_path1 = "models/count-vectorizer-model"
file1 = pathlib.Path(model_path1)
from pyspark.ml.feature import CountVectorizer,CountVectorizerModel
if file1.exists():
    print("count-vectorizer-model Exist")
    model = CountVectorizerModel.load(model_path1)
else:
    print("count-vectorizer-model Doesnot Exist...Training Model")
    cv = CountVectorizer(inputCol="ngrams", outputCol="features",vocabSize=10000)
    model = cv.fit(ngramDataFrame)
    print("Saving Trained Model")
    model.save(model_path1)

print("Transforming data")
wRescaledData = model.transform(ngramDataFrame)
test_wRescaledData = model.transform(test_df)
print("Transforming data Done")



count-vectorizer-model Exist
Transforming data
Transforming data Done


In [15]:
wtrainDF1 = wRescaledData.select(col("drama").alias("label"), "features")
wtrainDF2 = wRescaledData.select(col("comedy").alias("label"), "features")
wtrainDF3 = wRescaledData.select(col("romance film").alias("label"), "features")
wtrainDF4 = wRescaledData.select(col("thriller").alias("label"), "features")
wtrainDF5 = wRescaledData.select(col("action").alias("label"), "features")
wtrainDF6 = wRescaledData.select(col("world cinema").alias("label"), "features")
wtrainDF7 = wRescaledData.select(col("crime fiction").alias("label"), "features")
wtrainDF8 = wRescaledData.select(col("horror").alias("label"), "features")
wtrainDF9 = wRescaledData.select(col("black-and-white").alias("label"), "features")
wtrainDF10 = wRescaledData.select(col("indie").alias("label"), "features")
wtrainDF11 = wRescaledData.select(col("action/adventure").alias("label"), "features")
wtrainDF12 = wRescaledData.select(col("adventure").alias("label"), "features")
wtrainDF13 = wRescaledData.select(col("family film").alias("label"), "features")
wtrainDF14 = wRescaledData.select(col("short film").alias("label"), "features")
wtrainDF15 = wRescaledData.select(col("romantic drama").alias("label"), "features")
wtrainDF16 = wRescaledData.select(col("animation").alias("label"), "features")
wtrainDF17 = wRescaledData.select(col("musical").alias("label"), "features")
wtrainDF18 = wRescaledData.select(col("science fiction").alias("label"), "features")
wtrainDF19 = wRescaledData.select(col("mystery").alias("label"), "features")
wtrainDF20 = wRescaledData.select(col("romantic comedy").alias("label"), "features")

In [16]:
wtestDF = test_wRescaledData.select("features")

In [17]:
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lrm = LogisticRegression()


In [18]:
model_path3 = "models/lr-model1"
file3 = pathlib.Path(model_path3)
if file3.exists():
    print("LR-model Exist")
    wmodel1 = LogisticRegressionModel.load(model_path3)
else:
    print("LR-model Doesnot Exist...Training Model")
    wmodel1 = lrm.fit(wtrainDF1)
    print("Saving Trained Model")
    wmodel1.save(model_path3)
    
wresult1 = wmodel1.transform(wtestDF)

LR-model Exist


In [19]:
model_path4 = "models/lr-model2"
file4 = pathlib.Path(model_path4)
if file4.exists():
    print("LR-model Exist")
    wmodel2 = LogisticRegressionModel.load(model_path4)
else:
    print("LR-model Doesnot Exist...Training Model")
    wmodel2 = lrm.fit(wtrainDF2)
    print("Saving Trained Model")
    wmodel2.save(model_path4)
    
wresult2 = wmodel2.transform(wtestDF)

LR-model Exist


In [20]:
model_path5 = "models/lr-model3"
file5 = pathlib.Path(model_path5)
if file5.exists():
    print("LR-model Exist")
    wmodel3 = LogisticRegressionModel.load(model_path5)
else:
    print("LR-model Doesnot Exist...Training Model")
    wmodel3 = lrm.fit(wtrainDF3)
    print("Saving Trained Model")
    wmodel3.save(model_path5)
    
wresult3 = wmodel3.transform(wtestDF)

LR-model Exist


In [21]:
model_path6 = "models/lr-model4"
file6 = pathlib.Path(model_path6)
if file6.exists():
    print("LR-model Exist")
    wmodel4 = LogisticRegressionModel.load(model_path6)
else:
    print("LR-model Doesnot Exist...Training Model")
    wmodel4 = lrm.fit(wtrainDF4)
    print("Saving Trained Model")
    wmodel4.save(model_path6)
    
wresult4 = wmodel4.transform(wtestDF)

LR-model Exist


In [22]:
model_path7 = "models/lr-model5"
file7 = pathlib.Path(model_path7)
if file7.exists():
    print("LR-model Exist")
    wmodel5 = LogisticRegressionModel.load(model_path7)
else:
    print("LR-model Doesnot Exist...Training Model")
    wmodel5 = lrm.fit(wtrainDF5)
    print("Saving Trained Model")
    wmodel5.save(model_path7)
    
wresult5 = wmodel5.transform(wtestDF)

LR-model Exist


In [23]:
model_path8 = "models/lr-model6"
file8 = pathlib.Path(model_path8)
if file8.exists():
    print("LR-model Exist")
    wmodel6 = LogisticRegressionModel.load(model_path8)
else:
    print("LR-model Doesnot Exist...Training Model")
    wmodel6 = lrm.fit(wtrainDF6)
    print("Saving Trained Model")
    wmodel6.save(model_path8)
    
wresult6 = wmodel6.transform(wtestDF)

LR-model Exist


In [24]:
model_path9 = "models/lr-model7"
file9 = pathlib.Path(model_path9)
if file9.exists():
    print("LR-model Exist")
    wmodel7 = LogisticRegressionModel.load(model_path9)
else:
    print("LR-model Doesnot Exist...Training Model")
    wmodel7 = lrm.fit(wtrainDF7)
    print("Saving Trained Model")
    wmodel7.save(model_path9)
    
wresult7 = wmodel7.transform(wtestDF)

LR-model Exist


In [25]:
model_path10 = "models/lr-model8"
file10 = pathlib.Path(model_path10)
if file10.exists():
    wmodel8 = LogisticRegressionModel.load(model_path10)
else:
    wmodel8 = lrm.fit(wtrainDF8)
    wmodel8.save(model_path10)
wresult8 = wmodel8.transform(wtestDF)

In [26]:
model_path11 = "models/lr-model9"
file11 = pathlib.Path(model_path11)
if file11.exists():
    wmodel9 = LogisticRegressionModel.load(model_path11)
else:
    wmodel9 = lrm.fit(wtrainDF9)
    wmodel9.save(model_path11)
wresult9 = wmodel9.transform(wtestDF)

In [27]:
model_path12 = "models/lr-model10"
file12 = pathlib.Path(model_path12)
if file12.exists():
    wmodel10 = LogisticRegressionModel.load(model_path12)
else:
    wmodel10 = lrm.fit(wtrainDF10)
    wmodel10.save(model_path12)
wresult10 = wmodel10.transform(wtestDF)

In [28]:
model_path13 = "models/lr-model11"
file13 = pathlib.Path(model_path13)
if file13.exists():
    wmodel11 = LogisticRegressionModel.load(model_path13)
else:
    wmodel11 = lrm.fit(wtrainDF11)
    wmodel11.save(model_path13)
wresult11 = wmodel11.transform(wtestDF)

In [29]:
model_path14 = "models/lr-model12"
file14 = pathlib.Path(model_path14)
if file14.exists():
    wmodel12 = LogisticRegressionModel.load(model_path14)
else:
    wmodel12 = lrm.fit(wtrainDF12)
    wmodel12.save(model_path14)
wresult12 = wmodel12.transform(wtestDF)

In [30]:
model_path15 = "models/lr-model13"
file15 = pathlib.Path(model_path15)
if file15.exists():
    wmodel13 = LogisticRegressionModel.load(model_path15)
else:
    wmodel13 = lrm.fit(wtrainDF13)
    wmodel13.save(model_path15)
wresult13 = wmodel13.transform(wtestDF)

In [31]:
model_path16 = "models/lr-model14"
file16 = pathlib.Path(model_path16)
if file16.exists():
    wmodel14 = LogisticRegressionModel.load(model_path16)
else:
    wmodel14 = lrm.fit(wtrainDF14)
    wmodel14.save(model_path16)
wresult14 = wmodel14.transform(wtestDF)

In [32]:
model_path17 = "models/lr-model15"
file17 = pathlib.Path(model_path17)
if file17.exists():
    wmodel15 = LogisticRegressionModel.load(model_path17)
else:
    wmodel15 = lrm.fit(wtrainDF15)
    wmodel15.save(model_path17)
wresult15 = wmodel15.transform(wtestDF)

In [33]:
model_path18 = "models/lr-model16"
file18 = pathlib.Path(model_path18)
if file18.exists():
    wmodel16 = LogisticRegressionModel.load(model_path18)
else:
    wmodel16 = lrm.fit(wtrainDF16)
    wmodel16.save(model_path18)
wresult16 = wmodel16.transform(wtestDF)

In [34]:
model_path19 = "models/lr-model17"
file19 = pathlib.Path(model_path19)
if file19.exists():
    wmodel17 = LogisticRegressionModel.load(model_path19)
else:
    wmodel17 = lrm.fit(wtrainDF17)
    wmodel17.save(model_path19)
wresult17 = wmodel17.transform(wtestDF)

In [35]:
model_path20 = "models/lr-model18"
file20 = pathlib.Path(model_path20)
if file20.exists():
    wmodel18 = LogisticRegressionModel.load(model_path20)
else:
    wmodel18 = lrm.fit(wtrainDF18)
    wmodel18.save(model_path20)
wresult18 = wmodel18.transform(wtestDF)

In [36]:
model_path21 = "models/lr-model19"
file21 = pathlib.Path(model_path21)
if file21.exists():
    wmodel19 = LogisticRegressionModel.load(model_path21)
else:
    wmodel19 = lrm.fit(wtrainDF19)
    wmodel19.save(model_path21)
wresult19 = wmodel19.transform(wtestDF)

In [37]:
model_path22 = "models/lr-model20"
file22 = pathlib.Path(model_path22)
if file22.exists():
    wmodel20 = LogisticRegressionModel.load(model_path22)
else:
    wmodel20 = lrm.fit(wtrainDF20)
    wmodel20.save(model_path22)
wresult20 = wmodel20.transform(wtestDF)

In [38]:
wresult1 = wresult1.withColumnRenamed("prediction","prediction1")
wresult1.printSchema()
wresult2 = wresult2.withColumnRenamed("prediction","prediction2")
wresult3 = wresult3.withColumnRenamed("prediction","prediction3")
wresult4 = wresult4.withColumnRenamed("prediction","prediction4")
wresult5 = wresult5.withColumnRenamed("prediction","prediction5")
wresult6 = wresult6.withColumnRenamed("prediction","prediction6")
wresult7 = wresult7.withColumnRenamed("prediction","prediction7")
wresult8 = wresult8.withColumnRenamed("prediction","prediction8")
wresult9 = wresult9.withColumnRenamed("prediction","prediction9")
wresult10 = wresult10.withColumnRenamed("prediction","prediction10")
wresult11 = wresult11.withColumnRenamed("prediction","prediction11")
wresult12 = wresult12.withColumnRenamed("prediction","prediction12")
wresult13 = wresult13.withColumnRenamed("prediction","prediction13")
wresult14 = wresult14.withColumnRenamed("prediction","prediction14")
wresult15 = wresult15.withColumnRenamed("prediction","prediction15")
wresult16 = wresult16.withColumnRenamed("prediction","prediction16")
wresult17 = wresult17.withColumnRenamed("prediction","prediction17")
wresult18 = wresult18.withColumnRenamed("prediction","prediction18")
wresult19 = wresult19.withColumnRenamed("prediction","prediction19")
wresult20 = wresult20.withColumnRenamed("prediction","prediction20")

root
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction1: double (nullable = false)



In [39]:
w = wresult1.join(wresult2,on = ['features'],how = 'inner').select("features","prediction1","prediction2")
w = w.dropDuplicates(['features'])

w = w.join(wresult3,on = ['features'],how = 'inner').select("features","prediction1","prediction2","prediction3")
w = w.dropDuplicates(['features'])

w = w.join(wresult4,on = ['features'],how = 'inner').select("features","prediction1","prediction2","prediction3","prediction4")
w = w.dropDuplicates(['features'])

w = w.join(wresult5,on = ['features'],how = 'inner').select("features","prediction1","prediction2","prediction3","prediction4","prediction5")
w = w.dropDuplicates(['features'])

w = w.join(wresult6,on = ['features'],how = 'inner').select("features","prediction1","prediction2","prediction3","prediction4","prediction5","prediction6")
w = w.dropDuplicates(['features'])

w = w.join(wresult7,on = ['features'],how = 'inner').select("features","prediction1","prediction2","prediction3","prediction4","prediction5","prediction6","prediction7")
w = w.dropDuplicates(['features'])

w = w.join(wresult8,on = ['features'],how = 'inner').select("features","prediction1","prediction2","prediction3","prediction4","prediction5","prediction6","prediction7","prediction8")
w = w.dropDuplicates(['features'])

w = w.join(wresult9,on = ['features'],how = 'inner').select("features","prediction1","prediction2","prediction3","prediction4","prediction5","prediction6","prediction7","prediction8","prediction9")
w = w.dropDuplicates(['features'])

w = w.join(wresult10,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult11,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult12,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult13,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult14,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult15,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult16,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult17,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult18,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult19,on = ['features'],how = 'inner')
w = w.dropDuplicates(['features'])

w = w.join(wresult20,on = ['features'],how = 'inner').select("features","prediction1","prediction2","prediction3","prediction4","prediction5","prediction6","prediction7","prediction8","prediction9","prediction10","prediction11","prediction12","prediction13","prediction14","prediction15","prediction16","prediction17","prediction18","prediction19","prediction20")
w = w.dropDuplicates(['features'])


In [40]:
test_wRescaledData.printSchema()
test_wRescaledData_final = test_wRescaledData.select("movie_id","features")
w = w.join(test_wRescaledData_final,on = ['features'],how = 'inner')
w = w.dropDuplicates(['movie_id'])
#w.show(5)

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- text: string (nullable = true)
 |-- words_token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngrams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- features: vector (nullable = true)



In [41]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import math
#df1 = df
#word1 = 'work'
def makingLabelsForLabelx(val1,val2,val3,val4,val5,val6,val7,val8,val9,val10,val11,val12,val13,val14,val15,val16,val17,val18,val19,val20):
    val1 = str(math.floor(val1))
    val2 = str(math.floor(val2))
    val3 = str(math.floor(val3))
    val4 = str(math.floor(val4))
    val5 = str(math.floor(val5))
    val6 = str(math.floor(val6))
    val7 = str(math.floor(val7))
    val8 = str(math.floor(val8))
    val9 = str(math.floor(val9))
    val10 = str(math.floor(val10))
    val11 = str(math.floor(val11))
    val12 = str(math.floor(val12))
    val13 = str(math.floor(val13))
    val14 = str(math.floor(val14))
    val15 = str(math.floor(val15))
    val16 = str(math.floor(val16))
    val17 = str(math.floor(val17))
    val18 = str(math.floor(val18))
    val19 = str(math.floor(val19))
    val20 = str(math.floor(val20))
    return val1+" "+ val2+ " "+val3+ " "+ val4+ " "+ val5+ " "+ val6+ " "+ val7+ " "+val8 + " "+ val9+ " "+ val10+ " "+ val11+ " "+ val12+ " "+ val13+ " "+ val14+ " "+ val15+ " "+ val16+ " "+ val17+ " "+ val18+ " "+ val19+ " "+ val20

wlabeling = udf(makingLabelsForLabelx, StringType())
wdf_pred = w.withColumn("predictions", wlabeling("prediction1","prediction2","prediction3","prediction4","prediction5","prediction6","prediction7","prediction8","prediction9","prediction10","prediction11","prediction12","prediction13","prediction14","prediction15","prediction16","prediction17","prediction18","prediction19","prediction20"))

In [42]:
wdf_final = wdf_pred.select("movie_id","predictions")

In [43]:
wdf_final.show()

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
|15739815|1 1 0 0 0 0 0 0 0...|
|11272068|0 0 0 1 1 0 0 0 0...|
|20312589|1 0 1 0 0 1 1 0 1...|
| 4635580|1 0 0 0 0 0 0 0 0...|
|23658166|1 1 0 0 0 0 0 0 1...|
| 1428872|0 1 0 0 1 0 0 0 0...|
|   62693|1 0 0 1 1 0 0 0 1...|
|19286389|1 0 1 0 0 0 0 0 0...|
| 7003785|0 1 0 0 1 0 1 0 0...|
| 5565692|0 1 1 0 0 1 0 0 1...|
|33916241|0 1 0 0 0 0 0 0 0...|
| 1356971|1 1 1 1 0 0 0 0 0...|
|35968313|1 0 0 0 0 0 0 0 0...|
|  296252|0 0 0 1 1 0 0 0 0...|
|24817139|0 1 0 1 1 1 0 0 0...|
|23863620|1 0 1 0 0 0 1 1 0...|
|24003057|0 0 0 0 1 0 0 0 0...|
| 2268290|0 0 0 1 1 0 0 1 0...|
|13949859|1 0 0 1 0 0 0 0 0...|
|24225138|1 1 0 0 0 0 0 0 0...|
+--------+--------------------+
only showing top 20 rows



In [44]:
wdf_final.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("results_task3")

In [45]:
wdf_final.count()

7777