In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import col
import pandas as pd
import numpy as np

#prepocessing the data
print("begin to processing the raw data...")
spark = SparkSession.builder.master("local[*]").getOrCreate()
print("processing the train data...")
pd_train = pd.read_csv('/train.csv')
train_df = spark.createDataFrame(pd_train)
print("processing the mapping data...")
mp_df = spark.read.csv('/mapping.csv', header = True)
mapping_df = mp_df.select(col("_c0").alias("indexes"),col("0").alias("genres"))
mapping = {}
genreList = []
for row in mapping_df.collect():
  # print(row)
  indexes = row["indexes"]
  mapping[row['genres']] = indexes
  genreList.append(row['genres'])
# print(mapping)
print("processing the test data...")
pd_test = pd.read_csv('/test.csv')
test_df = spark.createDataFrame(pd_test)

train_df.show()
mp_df.show()
test_df.show()

begin to processing the raw data...
processing the train data...
processing the mapping data...
processing the test data...
+--------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               genre|
+--------+--------------------+--------------------+--------------------+
|23890098|          Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
|31186339|    The Hunger Games|The nation of Pan...|['Action/Adventur...|
|20663735|          Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|
| 2231378|  The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|
|  595909|   A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|
| 5272176|            End Game|The president is ...|['Action/Adventur...|
| 1952976|          Dark Water|{{plot}} The film...|['Thriller', 'Dra...|
|24225279|                Sing|The story begins ...|           ['Drama']|
| 2462689|       Meet John Doe|Infuriated at bei...|['Black-an

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer,CountVectorizer,HashingTF
from pyspark.sql.types import IntegerType


# RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. 
# reference: https://spark.apache.org/docs/latest/ml-features#tokenizer
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="tokens", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

# countTokens = udf(lambda words: len(words), IntegerType())

# regexTokenized = regexTokenizer.transform(sentenceDataFrame)
# regexTokenized.select("plot", "words") \
#     .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

# words which should be excluded from the input
hashingTF = HashingTF(inputCol=regexTokenizer.getOutputCol(), outputCol="features")
# remover= StopWordsRemover(inputCol="tokens", outputCol="stopRemove")
# remover.transform(sentenceData).show(truncate=False)

# fit a CountVectorizerModel from the corpus.
# cv = CountVectorizer(inputCol="rawfeatures", outputCol="features")

# run a sequence of algorithms to process and learn from data
# reference: https://spark.apache.org/docs/latest/ml-pipeline.html
pipeline = Pipeline(stages=[regexTokenizer,hashingTF])
# Fit the pipeline to documents.
model_train = pipeline.fit(train_df)
model_test = pipeline.fit(test_df)
# Make predictions on documents.
prediction_train = model_train.transform(train_df)
prediction_test = model_train.transform(test_df)

In [5]:
prediction_train.show()
prediction_test.show()

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               genre|              tokens|            features|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|          Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov, a, hard...|(262144,[2437,127...|
|31186339|    The Hunger Games|The nation of Pan...|['Action/Adventur...|[the, nation, of,...|(262144,[991,1739...|
|20663735|          Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|[poovalli, induch...|(262144,[119,571,...|
| 2231378|  The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|[the, lemon, drop...|(262144,[619,1998...|
|  595909|   A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|[seventh, day, ad...|(262144,[1911,243...|
| 5272176|            End Game|The president is ...|['Action/Adventur...

In [6]:
from pyspark.mllib.linalg import Matrix, Matrices

categories = mapping_df.count()
train_num = prediction_train.count()
# actually it is a list
feature_matrix=[0]*categories*train_num
line=-1
print("processing to remove symbol and get genres of train data")
for row in prediction_train.collect():
  line+=1
  # print(row)
  genres = row["genre"]
  # print(genres)
  try:
    if genres[0] == '[':
      genres = genres.replace('[','')
      genres = genres.replace(']','')
      genres = genres.replace('"','')
      genres = genres.replace("'","")
      genres = genres.split(',')
      rowlen = 0
      for genre in genres:
        # every row 
        rowlen = rowlen + categories
        genre = genre.strip()
        feature_matrix[int(mapping[genre])+line+rowlen] = 1
  except TypeError:
    print("processing genres failed")
print(feature_matrix)
dense_matric = Matrices.dense(train_num,categories,feature_matrix)
print(dense_matric)

  

processing to remove symbol and get genres of train data
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 

In [7]:
feature_f = dense_matric.toArray().tolist()
data_feature = spark.createDataFrame(feature_f,[genreList[0],genreList[1],genreList[2],genreList[3],genreList[4],genreList[5],genreList[6],genreList[7],genreList[8],genreList[9],genreList[10],genreList[11],genreList[12],genreList[13],genreList[14],genreList[15],genreList[16],genreList[17],genreList[18],genreList[19]])
data_feature.show()

+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|  0.0|   1.0|         0.0|     0.0|   0.0|         0.0|          0.0|   0.0|            0.0|  0.0|             0.0|      0.0|        0.0|       0.0|           0.0|      0.0|    0.0|            0.0|    0.0|            0.0|
|  0.0|   0.0|         0.0|     0.0|   0.0|         0.0|          0.0|   0.0|            0.0|  0.0|         

In [8]:
%%time
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import monotonically_increasing_id,concat_ws
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, ArrayType
from functools import reduce


print("processing feature and data...")
data_feature_r = data_feature.withColumn("id", monotonically_increasing_id())
train_data = prediction_train.withColumn("id", monotonically_increasing_id())
train_modf = train_data.join(data_feature_r,"id","outer").drop("id")
train_modf.show()
train_modf = train_modf.filter(" COALESCE(movie_id, movie_name, plot, genre) IS NOT NULL")
train_modf = train_modf.filter(" COALESCE( tokens ) IS NOT NULL")
train_modf = train_modf.filter(" COALESCE( features ) IS NOT NULL")
train_modf = train_modf.na.drop()
train_modf = train_modf.where(col("features").isNotNull())
trainFinal = train_modf.fillna(1)

#testMatrix = Matrices.dense(test_data.count(),len(test_data.columns),testArr)
ind=-1
predict= []
print("begain ml methold...")
for i in mapping_df.collect():
  ind+=1
  col = genreList[ind]
  lr = LogisticRegression(featuresCol="features", labelCol=col,family="multinomial")
  lrModel = lr.fit(trainFinal)
  predictions=lrModel.transform(prediction_test)
  predictions = predictions.withColumn("prediction", F.col("prediction").cast(IntegerType()))
  predict.append(predictions.select("movie_id","prediction"))

print(predict)


processing feature and data...
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|movie_id|          movie_name|                plot|               genre|              tokens|            features|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------

In [9]:
%%time
print("final processing...")
predict_r = [df.selectExpr('movie_id', f'prediction as prediction_{i}') for i, df in enumerate(predict)]
temp_df = reduce(lambda x, y: x.join(y, ['movie_id'], how='full'), predict_r)
col_list = ['prediction_%d' % i for i in range(len(predict))]
print("save the file...")
temp_df = temp_df.withColumn('predictions',concat_ws(" ",*col_list)).drop(*col_list).toPandas().to_csv("part2.csv",index=False)




final processing...
save the file...
CPU times: user 587 ms, sys: 132 ms, total: 719 ms
Wall time: 1min 57s
