In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
import pyspark

#### Location of files

In [7]:
file_names=['movies_metadata.csv','ratings_small.csv','links_small.csv','keywords.csv']
folder_name='./data/movies_40k'
comp='./data/movies_40k.zip'

### Creating Spark Session object

In [8]:
warnings.filterwarnings('ignore')
conf=pyspark.SparkConf().setMaster('local[*]')
spark=pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()

### Loading up data

To prevent data from shuffling we first load data through pandas and then convert it into pyspark dataframe

In [9]:
req_columns=['id','title','release_date','genres','vote_average','popularity','vote_count','imdb_id']
df_pd=pd.read_csv(folder_name+'/'+file_names[0])
df_pd=df_pd[req_columns]

### Creating schema for our data using Struct Type

In [10]:
from pyspark.sql.types import *
struct=[StructField(req_columns[0],StringType(),False),StructField(req_columns[1],StringType(),True),\
       StructField(req_columns[2],StringType(),True),StructField(req_columns[3],StringType(),True),\
       StructField(req_columns[4],DoubleType(),True),StructField(req_columns[5],DoubleType(),True),\
       StructField(req_columns[6],FloatType(),True),StructField(req_columns[7],StringType(),True)]
schema=StructType(fields=struct)
df=spark.createDataFrame(df_pd,schema=schema,verifySchema=False)
df=df.withColumn('id',df.id.cast('int'))
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- popularity: double (nullable = true)
 |-- vote_count: float (nullable = true)
 |-- imdb_id: string (nullable = true)



### Checking number of partitions of created dataframe

In [11]:
print('Number of Partitions: ',df.rdd.getNumPartitions())

Number of Partitions:  8


### Checking NaN values

In [12]:
print('Counting na values using pandas : \n',df_pd.isna().sum())

Counting na values using pandas : 
 id               0
title            6
release_date    87
genres           0
vote_average     6
popularity       5
vote_count       6
imdb_id         17
dtype: int64


In [13]:
from pyspark.sql.functions import isnan
count=lambda x:{col: df.filter(isnan(df[col])).count() for col in x}

print('Counting na values using pyspark :\n',count(df.columns))

Counting na values using pyspark :
 {'id': 0, 'title': 6, 'release_date': 87, 'genres': 0, 'vote_average': 6, 'popularity': 5, 'vote_count': 6, 'imdb_id': 17}


### Filtering data

In [15]:
df=df.filter(~(isnan(df.imdb_id) | isnan(df.title) | isnan(df.release_date)))

### Preparing data

In [17]:
from pyspark.sql.functions import udf

@udf
def extract_genre(row):
    row=eval(row)
    genre=''
    for dic in row:
        genre+=dic['name']+'-' 
    return genre[0:-1]
df=df.withColumn('genres',extract_genre(df.genres))

In [18]:
date=udf(lambda x: eval(x.split('-')[0]),'int') 
df=df.withColumn('release_date',date(df.release_date))

### Check for duplicate ids

In [19]:
try:
    assert df.select('id').count()==df.select('id').distinct().count()
    assert df.select('imdb_id').count()==df.select('imdb_id').distinct().count()
except:
    df=df.dropDuplicates(['id','imdb_id'])
    
df.write.csv('./movies_df.csv',sep=',',mode='overwrite',header=True)

### Working with SQL queries

In [20]:
df=spark.read.csv('./movies_df.csv',header=True,inferSchema=True)
df.createTempView('mov')
spark.sql('SELECT * FROM mov where lower(title) like "ted"').show()

+-----+-----+------------+--------------+------------+----------+----------+---------+
|   id|title|release_date|        genres|vote_average|popularity|vote_count|  imdb_id|
+-----+-----+------------+--------------+------------+----------+----------+---------+
|72105|  Ted|        2012|Comedy-Fantasy|         6.3| 19.638605|    4811.0|tt1637725|
+-----+-----+------------+--------------+------------+----------+----------+---------+

