In [22]:
import findspark
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, regexp_replace
import requests
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import functions as F


In [2]:
findspark.init()
findspark.find()


'C:\\Users\\HamzaMOUMINE\\Documents\\non_adm_software\\anaconda3\\envs\\workspace\\Lib\\site-packages\\pyspark'

In [3]:
from pyspark.sql import SparkSession


In [4]:
spark = SparkSession.builder.appName('domain.com').getOrCreate()


In [5]:
base_path="C:/Users/HamzaMOUMINE/Documents/data/films"
## Download dataset url: https://datasets.imdbws.com/

In [6]:
data_file="data.tsv"

In [7]:
folders=["name.basics.tsv", "title.akas.tsv", "title.basics.tsv", "title.crew.tsv", "title.episode.tsv", "title.principals.tsv", "title.ratings.tsv"]

In [8]:
files=[]
for folder in folders :
    files.append(base_path + '/' + folder + '/' + data_file)

In [9]:
name_basics = spark.read.csv(files[0], sep=r'\t', header=True, inferSchema=True)


In [10]:
title_akas = spark.read.csv(files[1], sep=r'\t', header=True, inferSchema=True)

In [11]:
title_basics = spark.read.csv(files[2], sep=r'\t', header=True, inferSchema=True)

In [12]:
title_crew = spark.read.csv(files[3], sep=r'\t', header=True, inferSchema=True)

In [13]:
title_episode = spark.read.csv(files[4], sep=r'\t', header=True, inferSchema=True)

In [14]:
title_principals = spark.read.csv(files[5], sep=r'\t', header=True, inferSchema=True)

In [15]:
title_ratings = spark.read.csv(files[6], sep=r'\t', header=True, inferSchema=True)

In [16]:
datas = [name_basics, title_akas, title_basics, title_crew, title_episode, title_principals, title_ratings]

In [17]:
for data, filename in zip(datas, folders):
    print(filename)
    data.show(1)

name.basics.tsv
+---------+------------+---------+---------+--------------------+--------------------+
|   nconst| primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+------------+---------+---------+--------------------+--------------------+
|nm0000001|Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0072308,tt00531...|
+---------+------------+---------+---------+--------------------+--------------------+
only showing top 1 row

title.akas.tsv
+---------+--------+----------+------+--------+-----------+----------+---------------+
|  titleId|ordering|     title|region|language|      types|attributes|isOriginalTitle|
+---------+--------+----------+------+--------+-----------+----------+---------------+
|tt0000001|       1|Карменсіта|    UA|      \N|imdbDisplay|        \N|              0|
+---------+--------+----------+------+--------+-----------+----------+---------------+
only showing top 1 row

title.basics.tsv
+---------+---------+------------+

In [18]:
for data in datas:
    data.withColumn('numNulls', sum(data[col].isNull().cast('int') for col in data.schema.names)).orderBy(col("numNulls").desc()).show(1)

+---------+------------+---------+---------+-----------------+--------------------+--------+
|   nconst| primaryName|birthYear|deathYear|primaryProfession|      knownForTitles|numNulls|
+---------+------------+---------+---------+-----------------+--------------------+--------+
|nm2917693|Noah Johnson|       \N|       \N|             null|tt11337960,tt0379713|       1|
+---------+------------+---------+---------+-----------------+--------------------+--------+
only showing top 1 row

+---------+--------+--------------------+------+--------+-----+----------+---------------+--------+
|  titleId|ordering|               title|region|language|types|attributes|isOriginalTitle|numNulls|
+---------+--------+--------------------+------+--------+-----+----------+---------------+--------+
|tt3880980|       1|マックのハッスル刑事	JP	ja	...|  null|    null| null|      null|           null|       5|
+---------+--------+--------------------+------+--------+-----+----------+---------------+--------+
only showin

In [24]:
## Construct indexed dataframe to use chunks
indexed_title_akas = title_akas.withColumn("index", monotonically_increasing_id())

+---------+--------+--------------------+------+--------+-----------+--------------------+---------------+-----+
|  titleId|ordering|               title|region|language|      types|          attributes|isOriginalTitle|index|
+---------+--------+--------------------+------+--------+-----------+--------------------+---------------+-----+
|tt0000001|       1|          Карменсіта|    UA|      \N|imdbDisplay|                  \N|              0|    0|
|tt0000001|       2|          Carmencita|    DE|      \N|         \N|       literal title|              0|    1|
|tt0000001|       3|Carmencita - span...|    HU|      \N|imdbDisplay|                  \N|              0|    2|
|tt0000001|       4|          Καρμενσίτα|    GR|      \N|imdbDisplay|                  \N|              0|    3|
|tt0000001|       5|          Карменсита|    RU|      \N|imdbDisplay|                  \N|              0|    4|
|tt0000001|       6|          Carmencita|    US|      \N|imdbDisplay|                  \N|      

In [None]:
## Lighten ids to integers using chunked loops
id=0
max_num = indexed_title_akas.count()
part = 1000
cpt=0
while cpt < max_num:
    for i in indexed_title_akas.filter(col("index").between(cpt, cpt+part)).select(col("titleId")).collect():
        c_title_akas = title_akas.withColumn("titleId", regexp_replace("titleId", i["titleId"], str(id)))
        c_title_episode = title_episode.withColumn("tconst", regexp_replace("tconst", i["titleId"], str(id)))
        c_title_episode = c_title_episode.withColumn("parentTconst", regexp_replace("parentTconst", i["titleId"], str(id)))
        c_title_basics = title_basics.withColumn("tconst", regexp_replace("tconst", i["titleId"], str(id)))
        c_title_crew = title_crew.withColumn("tconst", regexp_replace("tconst", i["titleId"], str(id)))
        c_title_principals = title_principals.withColumn("tconst", regexp_replace("tconst", i["titleId"], str(id)))
        c_title_ratings = title_ratings.withColumn("tconst", regexp_replace("tconst", i["titleId"], str(id)))
        id = id + 1
    if cpt + part >= max_num:
        size = max_num - cpt - 1
    cpt = cpt + part

In [None]:
## Comma strings to arrays
a_name_basics = name_basics.select(split(col("knownForTitles"),",").alias("knownForTitles"), split(col("primaryProfession"),",").alias("primaryProfession"), col("deathYear"), col("birthYear"), col("primaryName"), col("nconst"))
a_title_principals = c_title_principals.select(split(col("characters"), ",").alias("characters"), col("job"), col("category"), col("nconst"), col("ordering"), col("tconst"))
a_title_basics = c_title_basics.select(split(col("genres"), ",").alias("genres"), col("runtimeMinutes"), col("endYear"), col("startYear"), col("isAdult"), col("originalTitle"), col("primaryTitle"), col("titleType"), col("tconst"))
a_title_crew = c_title_crew.select(split(col("directors"), ",").alias("directors"), split(col("writers"), ",").alias("writers"), col("tconst")) 

In [147]:
# Replace unuseful strings with blank
name_basics_clean = name_basics.na.fill(value = "", subset = ["primaryProfession"]).withColumn('deathYear', regexp_replace('deathYear', "\\\\N", '')).withColumn('birthYear', regexp_replace('birthYear', "\\\\N", '')).withColumn('primaryProfession', regexp_replace('primaryProfession', 'null', '')).withColumn('knownForTitles', regexp_replace('knownForTitles', '\\\\N', ''))
#name_basics_clean.withColumn('numNulls', sum(name_basics_clean[col].like('\\N').cast('int') for col in name_basics_clean.schema.names)).orderBy(col("numNulls").desc()).show(1)

In [148]:
# Replace unuseful strings with blank
title_akas_clean = c_title_akas.na.fill(value = "", subset = ["region", "language", "types", "attributes", "isOriginalTitle"]).withColumn('attributes', regexp_replace('attributes', "\\\\N", '')).withColumn('language', regexp_replace('language', "\\\\N", '')).withColumn('types', regexp_replace('types', "\\\\N", '')).withColumn('region', regexp_replace('region', "\\\\N", '')).withColumn('isOriginalTitle', regexp_replace('isOriginalTitle', "\\\\N", ''))

In [149]:
# Replace unuseful strings with blank
title_basics_clean = c_title_basics.na.fill(value = "", subset = ["genres"]).withColumn('isAdult', regexp_replace('isAdult', "\\\\N", '')).withColumn('runtimeMinutes', regexp_replace('runtimeMinutes', "\\\\N", '')).withColumn('genres', regexp_replace('genres', "\\\\N", '')).withColumn('startYear', regexp_replace('startYear', "\\\\N", '')).withColumn('endYear', regexp_replace('endYear', "\\\\N", ''))
#title_basics_clean.withColumn('numNulls', sum(title_basics_clean[col].like('\\N').cast('int') for col in title_basics_clean.schema.names)).orderBy(col("numNulls").desc()).show(1)

In [150]:
# Replace unuseful strings with blank
title_crew_clean = c_title_crew.withColumn('writers', regexp_replace('writers', "\\\\N", '')).withColumn('directors', regexp_replace('directors', "\\\\N", ''))
#title_crew_clean.withColumn('numNulls', sum(title_crew_clean[col].like('\\N').cast('int') for col in title_crew_clean.schema.names)).orderBy(col("numNulls").desc()).show(1)

In [151]:
# Replace unuseful strings with blank
title_episode_clean = c_title_episode.withColumn('seasonNumber', regexp_replace('seasonNumber', "\\\\N", '')).withColumn('episodeNumber', regexp_replace('episodeNumber', "\\\\N", ''))
#title_episode_clean.withColumn('numNulls', sum(title_episode_clean[col].like('\\N').cast('int') for col in title_episode_clean.schema.names)).orderBy(col("numNulls").desc()).show(1)

In [152]:
# Replace unuseful strings with blank
title_principals_clean = c_title_principals.withColumn('job', regexp_replace('job', "\\\\N", '')).withColumn('characters', regexp_replace('characters', "\\\\N", ''))

In [153]:
# Replace unuseful strings with blank
title_ratings_clean = c_title_ratings

In [154]:
# Prepare data in array
prepared_data = [name_basics_clean, title_akas_clean, title_basics_clean, title_crew_clean, title_episode_clean, title_principals_clean, title_ratings_clean]

In [155]:
# Prepare endpoints's url
base_url = "http://localhost"
endpoints = [":8090/name-basics", ":8091/title-akas", ":8092/title-basics", ":8093/title-crew", ":8094/title-episode", ":8095/title-principals", ":8096/title-ratings"]

In [156]:
## Construct json and post result to api
for p_data, endpoint in zip(prepared_data, endpoints):
    data = p_data.toJSON()
    print(data.first())
    #requests.post(url = base_url + endpoint, data = data.first())

{"nconst":"nm0000001","primaryName":"Fred Astaire","birthYear":"1899","deathYear":"1987","primaryProfession":"soundtrack,actor,miscellaneous","knownForTitles":"tt0072308,tt0053137,tt0031983,tt0050419"}
{"titleId":"tt0000001","ordering":1,"title":"Карменсіта","region":"UA","language":"","types":"imdbDisplay","attributes":"","isOriginalTitle":"0"}
{"tconst":"tt0000001","titleType":"short","primaryTitle":"Carmencita","originalTitle":"Carmencita","isAdult":"0","startYear":"1894","endYear":"","runtimeMinutes":"1","genres":"Documentary,Short"}
{"tconst":"tt0000001","directors":"nm0005690","writers":""}
{"tconst":"tt0020666","parentTconst":"tt15180956","seasonNumber":"1","episodeNumber":"2"}
{"tconst":"tt0000001","ordering":1,"nconst":"nm1588970","category":"self","job":"","characters":"[\"Self\"]"}
{"tconst":"tt0000001","averageRating":5.7,"numVotes":1890}
