In [1]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

app_name = 'final_project'

conf = SparkConf()

hdfs_host = 'hdfs://namenode:8020'

conf.set("hive.metastore.uris", "http://hive-metastore:9083")
conf.set("spark.kerberos.access.hadoopFileSystem", hdfs_host)
conf.set("spark.sql.warehouse.dir", f"{hdfs_host}/user/hive/warehouse")
conf.set("hive.metastore.warehouse.dir", f"{hdfs_host}/user/hive/warehouse")

spark = SparkSession\
  .builder\
  .appName(app_name)\
  .config(conf=conf)\
  .getOrCreate()

22/02/13 07:48:24 WARN Utils: Your hostname, avto-HP-Laptop resolves to a loopback address: 127.0.1.1; using 192.168.100.12 instead (on interface wlo1)
22/02/13 07:48:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/13 07:48:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/13 07:48:26 WARN FileSystem: Failed to initialize fileystem hdfs://namenode:8020/user/hive/warehouse: java.lang.IllegalArgumentException: java.net.UnknownHostException: namenode
22/02/13 07:48:26 WARN SharedState: Cannot qualify the warehouse path, leaving it unqualified.
java.lang.IllegalArgumentException: java.net.UnknownHostException: namenode
	at org.apache.hadoop.security.SecurityUtil.buildTok

In [3]:
data_lake_url = f'{hdfs_host}/data_lake'
print(data_lake_url)

hdfs://namenode:8020/data_lake


# Credits

In [4]:
import pyspark.sql.functions as f

from pyspark.sql.types import *

cast_schema = ArrayType(StructType([StructField('cast_id',IntegerType(),True),
                        StructField('character',StringType(),True),
                        StructField('credit_id',StringType(),True),
                        StructField('gender',IntegerType(),True),
                        StructField('id',IntegerType(),True),
                        StructField('name',StringType(),True),
                        StructField('order',IntegerType(),True),
                        StructField('profile_path',StringType(),True)]))


# Remove escape chars; Python None to JSON null
creds = spark\
      .read\
      .option("quote", "\"")\
      .option("escape", "\"")\
      .csv('credits.csv', header=True)\
      .withColumn('cast', f.regexp_replace(f.col('cast'), ': None', ': null'))\
      .withColumn('cast', f.regexp_replace(f.col('cast'), "\\\\'", ""))\
      .withColumn('cast', f.regexp_replace(f.col('cast'), "\\\\", ""))


creds = creds\
  .withColumn('cast_members', f.from_json(creds.cast, cast_schema))

# Records that have schema issues:

rec_with_issues = creds.where(f.col('cast_members').isNull())

# print(rec_with_issues.count())
# rec_with_issues.show()
# creds.show()

creds.columns

['cast', 'crew', 'id', 'cast_members']

In [5]:
creds.printSchema()

root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: string (nullable = true)
 |-- cast_members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cast_id: integer (nullable = true)
 |    |    |-- character: string (nullable = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- order: integer (nullable = true)
 |    |    |-- profile_path: string (nullable = true)



In [6]:
# Examples

# creds\
#   .select('id', 'cast_members.name', 'cast_members.gender', 'cast_members.character')\
#   .show(1, vertical=True, truncate=False)

# Arrays to individual records
cast_members = creds\
  .withColumn('member', f.explode('cast_members')).withColumnRenamed('id', 'film_id')\
  .select('film_id', 'member.name', 'member.gender', 'member.character', 'member.id')

cast_members.show(20, vertical=False, truncate=False)

+-------+-----------------+------+------------------------------+-------+
|film_id|name             |gender|character                     |id     |
+-------+-----------------+------+------------------------------+-------+
|862    |Tom Hanks        |2     |Woody (voice)                 |31     |
|862    |Tim Allen        |2     |Buzz Lightyear (voice)        |12898  |
|862    |Don Rickles      |2     |Mr. Potato Head (voice)       |7167   |
|862    |Jim Varney       |2     |Slinky Dog (voice)            |12899  |
|862    |Wallace Shawn    |2     |Rex (voice)                   |12900  |
|862    |John Ratzenberger|2     |Hamm (voice)                  |7907   |
|862    |Annie Potts      |1     |Bo Peep (voice)               |8873   |
|862    |John Morris      |0     |Andy (voice)                  |1116442|
|862    |Erik von Detten  |2     |Sid (voice)                   |12901  |
|862    |Laurie Metcalf   |1     |Mrs. Davis (voice)            |12133  |
|862    |R. Lee Ermey     |2     |Serg

In [7]:
from pyspark.sql.functions import monotonically_increasing_id

# Cast
cast = cast_members\
    .withColumnRenamed('id', 'cast_member_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'cast_member_id', 'film_id', 'name')

data_lake_path = f'{hdfs_host}/data_lake'

# cast.write.mode('overwrite').csv(f'{data_lake_path}/DimCast.csv')

cast.show()

+---+--------------+-------+-----------------+
| id|cast_member_id|film_id|             name|
+---+--------------+-------+-----------------+
|  0|            31|    862|        Tom Hanks|
|  1|         12898|    862|        Tim Allen|
|  2|          7167|    862|      Don Rickles|
|  3|         12899|    862|       Jim Varney|
|  4|         12900|    862|    Wallace Shawn|
|  5|          7907|    862|John Ratzenberger|
|  6|          8873|    862|      Annie Potts|
|  7|       1116442|    862|      John Morris|
|  8|         12901|    862|  Erik von Detten|
|  9|         12133|    862|   Laurie Metcalf|
| 10|          8655|    862|     R. Lee Ermey|
| 11|         12903|    862|    Sarah Freeman|
| 12|         37221|    862|    Penn Jillette|
| 13|          2157|   8844|   Robin Williams|
| 14|          8537|   8844|    Jonathan Hyde|
| 15|           205|   8844|    Kirsten Dunst|
| 16|        145151|   8844|   Bradley Pierce|
| 17|          5149|   8844|      Bonnie Hunt|
| 18|        

In [8]:

# Character
character = cast_members\
    .withColumnRenamed('id', 'cast_member_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'cast_member_id', 'character')

character.show()

+---+--------------+--------------------+
| id|cast_member_id|           character|
+---+--------------+--------------------+
|  0|            31|       Woody (voice)|
|  1|         12898|Buzz Lightyear (v...|
|  2|          7167|Mr. Potato Head (...|
|  3|         12899|  Slinky Dog (voice)|
|  4|         12900|         Rex (voice)|
|  5|          7907|        Hamm (voice)|
|  6|          8873|     Bo Peep (voice)|
|  7|       1116442|        Andy (voice)|
|  8|         12901|         Sid (voice)|
|  9|         12133|  Mrs. Davis (voice)|
| 10|          8655|    Sergeant (voice)|
| 11|         12903|      Hannah (voice)|
| 12|         37221|TV Announcer (voice)|
| 13|          2157|        Alan Parrish|
| 14|          8537|Samuel Alan Parri...|
| 15|           205|        Judy Sheperd|
| 16|        145151|      Peter Shepherd|
| 17|          5149|       Sarah Whittle|
| 18|         10739|       Nora Shepherd|
| 19|         58563|        Carl Bentley|
+---+--------------+--------------

In [9]:

# Cast Roles
VOICE = 'voice'
PLAYER = 'actor/actress'

role = cast_members\
    .withColumn('is_voice', f.col('character').contains('(voice)'))\
    .withColumn('role', f.when(f.col('is_voice'), VOICE).otherwise(PLAYER))\
    .withColumnRenamed('id', 'cast_member_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'cast_member_id', 'role').where(f.col('role') == PLAYER)

role.show()

+---+--------------+-------------+
| id|cast_member_id|         role|
+---+--------------+-------------+
| 13|          2157|actor/actress|
| 14|          8537|actor/actress|
| 15|           205|actor/actress|
| 16|        145151|actor/actress|
| 17|          5149|actor/actress|
| 18|         10739|actor/actress|
| 19|         58563|actor/actress|
| 20|          1276|actor/actress|
| 21|         46530|actor/actress|
| 22|         56523|actor/actress|
| 23|         51551|actor/actress|
| 24|         56522|actor/actress|
| 25|       1000304|actor/actress|
| 26|        188949|actor/actress|
| 27|       1076551|actor/actress|
| 28|       1480246|actor/actress|
| 29|         25024|actor/actress|
| 30|         27110|actor/actress|
| 31|         53715|actor/actress|
| 32|       1379424|actor/actress|
+---+--------------+-------------+
only showing top 20 rows



# Movies Metadata

In [10]:
meta = spark\
  .read\
  .option("multiLine", "true")\
  .option("quote", '"')\
  .option("header", "true")\
  .option("escape", '"')\
  .option("wholeFile", True)\
  .csv("movies_metadata.csv", header=True)\

# ამ სამ ჩანაწერს მართლა თავისი გაჭირვება აქვს და ჯობია, დავაიგნოროთ
meta = meta\
  .where("imdb_id not in ('0', 'tt0113002', 'tt2423504', 'tt2622826')")


# ეს ნაწილი უკეთ გადააკეთეთ, მაგდენსაც აღარ დავწერთ :დ არაერთ ველზე დაგჭირდებათ escape character-ების ჩანაცვლება
prod_countries = 'production_countries'
meta = meta\
  .withColumn(prod_countries, f.regexp_replace(f.col(prod_countries), ': None', ': null')) \
  .withColumn(prod_countries, f.regexp_replace(f.col(prod_countries), "\\\\'", " "))\
  .withColumn(prod_countries, f.regexp_replace(f.col(prod_countries), "\\\\", " "))

tmp = meta.select("spoken_languages", )

tmp.show()

meta.printSchema()


+--------------------+
|    spoken_languages|
+--------------------+
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'f...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
|[{'iso_639_1': 'e...|
+--------------------+
only showing top 20 rows

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-

In [11]:
prod_countries_schema = spark.read.json(meta.rdd.map(lambda row: row.production_countries)).schema

# discard '_corrupt_record'
# prod_countries_schema = StructType(list(prod_countries_schema)[1:])

prod_countries_schema = ArrayType(prod_countries_schema)

prod_countries_schema

                                                                                

ArrayType(StructType(List(StructField(iso_3166_1,StringType,true),StructField(name,StringType,true))),true)

In [12]:
meta = meta\
  .withColumn('prod_countries', f.from_json(f.col(prod_countries), prod_countries_schema))

# Records that have schema issues:

rec_with_issues = meta.where(f.col('prod_countries').isNull())

print(rec_with_issues.count())
rec_with_issues.show()

                                                                                

0
+-----+---------------------+------+------+--------+---+-------+-----------------+--------------+--------+----------+-----------+--------------------+--------------------+------------+-------+-------+----------------+------+-------+-----+-----+------------+----------+--------------+
|adult|belongs_to_collection|budget|genres|homepage| id|imdb_id|original_language|original_title|overview|popularity|poster_path|production_companies|production_countries|release_date|revenue|runtime|spoken_languages|status|tagline|title|video|vote_average|vote_count|prod_countries|
+-----+---------------------+------+------+--------+---+-------+-----------------+--------------+--------+----------+-----------+--------------------+--------------------+------------+-------+-------+----------------+------+-------+-----+-----+------------+----------+--------------+
+-----+---------------------+------+------+--------+---+-------+-----------------+--------------+--------+----------+-----------+-----------------

In [13]:

meta\
  .select('id', 'imdb_id', 'prod_countries.name', 'title')\
  .show(truncate=False)


+-----+---------+--------------------------------------------------+------------------------------+
|id   |imdb_id  |name                                              |title                         |
+-----+---------+--------------------------------------------------+------------------------------+
|862  |tt0114709|[United States of America]                        |Toy Story                     |
|8844 |tt0113497|[United States of America]                        |Jumanji                       |
|15602|tt0113228|[United States of America]                        |Grumpier Old Men              |
|31357|tt0114885|[United States of America]                        |Waiting to Exhale             |
|11862|tt0113041|[United States of America]                        |Father of the Bride Part II   |
|949  |tt0113277|[United States of America]                        |Heat                          |
|11860|tt0114319|[Germany, United States of America]               |Sabrina                       |


In [58]:

# Films
films = meta.select('id', 'title', 'original_title')

+------+--------------------+--------------------+
|    id|               title|      original_title|
+------+--------------------+--------------------+
|  9091|        Sudden Death|        Sudden Death|
|  9691|           Assassins|           Assassins|
|   451|   Leaving Las Vegas|   Leaving Las Vegas|
| 35196|         Restoration|         Restoration|
| 11861|How To Make An Am...|How To Make An Am...|
|  8391|When Night Is Fal...|When Night Is Fal...|
| 26441|       The Big Green|       The Big Green|
| 97406|             Georgia|             Georgia|
|  9536|            Bio-Dome|            Bio-Dome|
|124626|           Nico Icon|           Nico Icon|
| 27526|  The Crossing Guard|  The Crossing Guard|
|  2086|        Nick of Time|        Nick of Time|
|   406|            La Haine|            La Haine|
| 63076|Heidi Fleiss: Hol...|Heidi Fleiss: Hol...|
| 43566|    Before and After|    Before and After|
| 78406|Steal Big Steal L...|Steal Big Steal L...|
| 55731|        Race the Sun|  

In [15]:

# Countries
prod_countries = meta\
    .withColumn('country', f.explode(f.col('prod_countries')))\
    .withColumn('country_id', monotonically_increasing_id())\
    .withColumnRenamed('id', 'film_id')\
    .select('country_id', 'film_id', 'country.name')

prod_countries.show()

+----------+-------+--------------------+
|country_id|film_id|                name|
+----------+-------+--------------------+
|         0|    862|United States of ...|
|         1|   8844|United States of ...|
|         2|  15602|United States of ...|
|         3|  31357|United States of ...|
|         4|  11862|United States of ...|
|         5|    949|United States of ...|
|         6|  11860|             Germany|
|         7|  11860|United States of ...|
|         8|  45325|United States of ...|
|         9|   9091|United States of ...|
|        10|    710|      United Kingdom|
|        11|    710|United States of ...|
|        12|   9087|United States of ...|
|        13|  12110|              France|
|        14|  12110|United States of ...|
|        15|  21032|United States of ...|
|        16|  10858|United States of ...|
|        17|   1408|              France|
|        18|   1408|             Germany|
|        19|   1408|               Italy|
+----------+-------+--------------

In [16]:
# Genres
genres_schema = ArrayType(StructType([StructField('id',IntegerType(),True),
                        StructField('name',StringType(),True)]))

genres = meta\
    .withColumn('genres_schema', f.from_json('genres', genres_schema))\
    .withColumn('genres_explode', f.explode(f.col('genres_schema')))\
    .withColumnRenamed('id', 'film_id')\
    .select('genres_explode.id', 'film_id', 'genres_explode.name')

genres.show()

+-----+-------+---------+
|   id|film_id|     name|
+-----+-------+---------+
|   16|    862|Animation|
|   35|    862|   Comedy|
|10751|    862|   Family|
|   12|   8844|Adventure|
|   14|   8844|  Fantasy|
|10751|   8844|   Family|
|10749|  15602|  Romance|
|   35|  15602|   Comedy|
|   35|  31357|   Comedy|
|   18|  31357|    Drama|
|10749|  31357|  Romance|
|   35|  11862|   Comedy|
|   28|    949|   Action|
|   80|    949|    Crime|
|   18|    949|    Drama|
|   53|    949| Thriller|
|   35|  11860|   Comedy|
|10749|  11860|  Romance|
|   28|  45325|   Action|
|   12|  45325|Adventure|
+-----+-------+---------+
only showing top 20 rows



In [17]:
# Homepages
homepages = meta\
    .withColumn('homepage_id', monotonically_increasing_id())\
    .withColumnRenamed('id', 'film_id')\
    .select('homepage_id', 'film_id', 'homepage')

homepages.show()

+-----------+-------+--------------------+
|homepage_id|film_id|            homepage|
+-----------+-------+--------------------+
|          0|    862|http://toystory.d...|
|          1|   8844|                null|
|          2|  15602|                null|
|          3|  31357|                null|
|          4|  11862|                null|
|          5|    949|                null|
|          6|  11860|                null|
|          7|  45325|                null|
|          8|   9091|                null|
|          9|    710|http://www.mgm.co...|
|         10|   9087|                null|
|         11|  12110|                null|
|         12|  21032|                null|
|         13|  10858|                null|
|         14|   1408|                null|
|         15|    524|                null|
|         16|   4584|                null|
|         17|      5|                null|
|         18|   9273|                null|
|         19|  11517|                null|
+----------

In [18]:
# imdb_id
imdb_id = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'imdb_id')

imdb_id.show()

+---+-------+---------+
| id|film_id|  imdb_id|
+---+-------+---------+
|  0|    862|tt0114709|
|  1|   8844|tt0113497|
|  2|  15602|tt0113228|
|  3|  31357|tt0114885|
|  4|  11862|tt0113041|
|  5|    949|tt0113277|
|  6|  11860|tt0114319|
|  7|  45325|tt0112302|
|  8|   9091|tt0114576|
|  9|    710|tt0113189|
| 10|   9087|tt0112346|
| 11|  12110|tt0112896|
| 12|  21032|tt0112453|
| 13|  10858|tt0113987|
| 14|   1408|tt0112760|
| 15|    524|tt0112641|
| 16|   4584|tt0114388|
| 17|      5|tt0113101|
| 18|   9273|tt0112281|
| 19|  11517|tt0113845|
+---+-------+---------+
only showing top 20 rows



In [19]:
# Overview
overview = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'overview').limit(10)

overview.show()

+---+-------+--------------------+
| id|film_id|            overview|
+---+-------+--------------------+
|  0|    862|Led by Woody, And...|
|  1|   8844|When siblings Jud...|
|  2|  15602|A family wedding ...|
|  3|  31357|Cheated on, mistr...|
|  4|  11862|Just when George ...|
|  5|    949|Obsessive master ...|
|  6|  11860|An ugly duckling ...|
|  7|  45325|A mischievous you...|
|  8|   9091|International act...|
|  9|    710|James Bond must u...|
+---+-------+--------------------+



In [20]:
# Popularity
popularity = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'popularity')

popularity.show()

+---+-------+----------+
| id|film_id|popularity|
+---+-------+----------+
|  0|    862| 21.946943|
|  1|   8844| 17.015539|
|  2|  15602|   11.7129|
|  3|  31357|  3.859495|
|  4|  11862|  8.387519|
|  5|    949| 17.924927|
|  6|  11860|  6.677277|
|  7|  45325|  2.561161|
|  8|   9091|   5.23158|
|  9|    710| 14.686036|
| 10|   9087|  6.318445|
| 11|  12110|  5.430331|
| 12|  21032| 12.140733|
| 13|  10858|     5.092|
| 14|   1408|  7.284477|
| 15|    524| 10.137389|
| 16|   4584| 10.673167|
| 17|      5|  9.026586|
| 18|   9273|  8.205448|
| 19|  11517|  7.337906|
+---+-------+----------+
only showing top 20 rows



In [21]:

# Production Companies
companies_schema = ArrayType(StructType([StructField('id',IntegerType(),True),
                        StructField('name',StringType(),True)]))

production_companies = prod_countries = meta\
    .withColumn('company', f.from_json(f.col('production_companies'), companies_schema))\
    .withColumn("company_name", f.explode('company.name'))\
    .withColumn('company_id', monotonically_increasing_id())\
    .withColumnRenamed('id', 'film_id')\
    .select('company_id', 'film_id', 'company_name')

production_companies.show()

+----------+-------+--------------------+
|company_id|film_id|        company_name|
+----------+-------+--------------------+
|         0|    862|Pixar Animation S...|
|         1|   8844|    TriStar Pictures|
|         2|   8844|        Teitler Film|
|         3|   8844|Interscope Commun...|
|         4|  15602|        Warner Bros.|
|         5|  15602|      Lancaster Gate|
|         6|  31357|Twentieth Century...|
|         7|  11862|Sandollar Product...|
|         8|  11862| Touchstone Pictures|
|         9|    949| Regency Enterprises|
|        10|    949|        Forward Pass|
|        11|    949|        Warner Bros.|
|        12|  11860|  Paramount Pictures|
|        13|  11860|Scott Rudin Produ...|
|        14|  11860|  Mirage Enterprises|
|        15|  11860|Sandollar Product...|
|        16|  11860|Constellation Ent...|
|        17|  11860|           Worldwide|
|        18|  11860|Mont Blanc Entert...|
|        19|  45325|Walt Disney Pictures|
+----------+-------+--------------

In [22]:

# Release Date
release_date = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'release_date')

release_date.show()

+---+-------+------------+
| id|film_id|release_date|
+---+-------+------------+
|  0|    862|  1995-10-30|
|  1|   8844|  1995-12-15|
|  2|  15602|  1995-12-22|
|  3|  31357|  1995-12-22|
|  4|  11862|  1995-02-10|
|  5|    949|  1995-12-15|
|  6|  11860|  1995-12-15|
|  7|  45325|  1995-12-22|
|  8|   9091|  1995-12-22|
|  9|    710|  1995-11-16|
| 10|   9087|  1995-11-17|
| 11|  12110|  1995-12-22|
| 12|  21032|  1995-12-22|
| 13|  10858|  1995-12-22|
| 14|   1408|  1995-12-22|
| 15|    524|  1995-11-22|
| 16|   4584|  1995-12-13|
| 17|      5|  1995-12-09|
| 18|   9273|  1995-11-10|
| 19|  11517|  1995-11-21|
+---+-------+------------+
only showing top 20 rows



In [23]:

# Revenue
revenue = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'revenue', 'imdb_id')

revenue.show()

+---+-------+---------+---------+
| id|film_id|  revenue|  imdb_id|
+---+-------+---------+---------+
|  0|    862|373554033|tt0114709|
|  1|   8844|262797249|tt0113497|
|  2|  15602|        0|tt0113228|
|  3|  31357| 81452156|tt0114885|
|  4|  11862| 76578911|tt0113041|
|  5|    949|187436818|tt0113277|
|  6|  11860|        0|tt0114319|
|  7|  45325|        0|tt0112302|
|  8|   9091| 64350171|tt0114576|
|  9|    710|352194034|tt0113189|
| 10|   9087|107879496|tt0112346|
| 11|  12110|        0|tt0112896|
| 12|  21032| 11348324|tt0112453|
| 13|  10858| 13681765|tt0113987|
| 14|   1408| 10017322|tt0112760|
| 15|    524|116112375|tt0112641|
| 16|   4584|135000000|tt0114388|
| 17|      5|  4300000|tt0113101|
| 18|   9273|212385533|tt0112281|
| 19|  11517| 35431113|tt0113845|
+---+-------+---------+---------+
only showing top 20 rows



In [24]:
# Runtime
runtime = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'runtime')

runtime.show()

+---+-------+-------+
| id|film_id|runtime|
+---+-------+-------+
|  0|    862|   81.0|
|  1|   8844|  104.0|
|  2|  15602|  101.0|
|  3|  31357|  127.0|
|  4|  11862|  106.0|
|  5|    949|  170.0|
|  6|  11860|  127.0|
|  7|  45325|   97.0|
|  8|   9091|  106.0|
|  9|    710|  130.0|
| 10|   9087|  106.0|
| 11|  12110|   88.0|
| 12|  21032|   78.0|
| 13|  10858|  192.0|
| 14|   1408|  119.0|
| 15|    524|  178.0|
| 16|   4584|  136.0|
| 17|      5|   98.0|
| 18|   9273|   90.0|
| 19|  11517|  103.0|
+---+-------+-------+
only showing top 20 rows



In [25]:

# Spoken Languages
languages_schema = ArrayType(StructType([StructField('iso_639_1', StringType(), True),
                        StructField('name', StringType(), True)]))

# meta.select(meta.spoken_languages).limit(10).write.csv('../data/csv/output/out1')

spoken_languages = meta\
    .withColumn('languages_explode', f.from_json(meta.spoken_languages, languages_schema))\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .withColumn('languages', f.explode(f.col('languages_explode.name')))\
    .select('id', 'film_id', 'languages')

spoken_languages.show()

+---+-------+---------+
| id|film_id|languages|
+---+-------+---------+
|  0|    862|  English|
|  1|   8844|  English|
|  1|   8844| Français|
|  2|  15602|  English|
|  3|  31357|  English|
|  4|  11862|  English|
|  5|    949|  English|
|  5|    949|  Español|
|  6|  11860| Français|
|  6|  11860|  English|
|  7|  45325|  English|
|  7|  45325|  Deutsch|
|  8|   9091|  English|
|  9|    710|  English|
|  9|    710|  Pусский|
|  9|    710|  Español|
| 10|   9087|  English|
| 11|  12110|  English|
| 11|  12110|  Deutsch|
| 12|  21032|  English|
+---+-------+---------+
only showing top 20 rows



In [26]:

# Status
status = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'status')

status.show()

+---+-------+--------+
| id|film_id|  status|
+---+-------+--------+
|  0|    862|Released|
|  1|   8844|Released|
|  2|  15602|Released|
|  3|  31357|Released|
|  4|  11862|Released|
|  5|    949|Released|
|  6|  11860|Released|
|  7|  45325|Released|
|  8|   9091|Released|
|  9|    710|Released|
| 10|   9087|Released|
| 11|  12110|Released|
| 12|  21032|Released|
| 13|  10858|Released|
| 14|   1408|Released|
| 15|    524|Released|
| 16|   4584|Released|
| 17|      5|Released|
| 18|   9273|Released|
| 19|  11517|Released|
+---+-------+--------+
only showing top 20 rows



In [27]:

# Vote Average

vote_average = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'vote_average')

vote_average.show()

+---+-------+------------+
| id|film_id|vote_average|
+---+-------+------------+
|  0|    862|         7.7|
|  1|   8844|         6.9|
|  2|  15602|         6.5|
|  3|  31357|         6.1|
|  4|  11862|         5.7|
|  5|    949|         7.7|
|  6|  11860|         6.2|
|  7|  45325|         5.4|
|  8|   9091|         5.5|
|  9|    710|         6.6|
| 10|   9087|         6.5|
| 11|  12110|         5.7|
| 12|  21032|         7.1|
| 13|  10858|         7.1|
| 14|   1408|         5.7|
| 15|    524|         7.8|
| 16|   4584|         7.2|
| 17|      5|         6.5|
| 18|   9273|         6.1|
| 19|  11517|         5.4|
+---+-------+------------+
only showing top 20 rows



In [28]:

# Vote Count
vote_count = meta\
    .withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id())\
    .select('id', 'film_id', 'vote_count').limit(20)

vote_count.show()

# vote_count.write.format("com.databricks.spark.csv").option("header", "true").save("hdfs://localhost:8888/vote_count.csv");

+---+-------+----------+
| id|film_id|vote_count|
+---+-------+----------+
|  0|    862|      5415|
|  1|   8844|      2413|
|  2|  15602|        92|
|  3|  31357|        34|
|  4|  11862|       173|
|  5|    949|      1886|
|  6|  11860|       141|
|  7|  45325|        45|
|  8|   9091|       174|
|  9|    710|      1194|
| 10|   9087|       199|
| 11|  12110|       210|
| 12|  21032|       423|
| 13|  10858|        72|
| 14|   1408|       137|
| 15|    524|      1343|
| 16|   4584|       364|
| 17|      5|       539|
| 18|   9273|      1128|
| 19|  11517|       224|
+---+-------+----------+



In [29]:
from pyspark.sql.functions import exp

def save_image(film_id: str) -> str:
    print("RUN")
    from imdb import IMDb
    # create an instance of the IMDb class
    ia = IMDb()

    # get a movie
    movie = ia.get_movie(film_id)
    return movie['cover url']


# for i in spark.sql('select imdb_id from films_v3.DimImdbId').limit(10).withColumn('url', )

from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: save_image(val[2:]), # do sth to val
    StringType()
)
urls = imdb_id.limit(10).withColumn('url', func_name(imdb_id.imdb_id))
urls.show()

# print(urls)

RUN
RUNage 34:>                                                         (0 + 1) / 1]
RUN
RUN
RUN
RUN
RUN
RUN
RUN
RUN


+---+-------+---------+--------------------+
| id|film_id|  imdb_id|                 url|
+---+-------+---------+--------------------+
|  0|    862|tt0114709|https://m.media-a...|
|  1|   8844|tt0113497|https://m.media-a...|
|  2|  15602|tt0113228|https://m.media-a...|
|  3|  31357|tt0114885|https://m.media-a...|
|  4|  11862|tt0113041|https://m.media-a...|
|  5|    949|tt0113277|https://m.media-a...|
|  6|  11860|tt0114319|https://m.media-a...|
|  7|  45325|tt0112302|https://m.media-a...|
|  8|   9091|tt0114576|https://m.media-a...|
|  9|    710|tt0113189|https://m.media-a...|
+---+-------+---------+--------------------+



                                                                                

In [30]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def save_image(film_id: str) -> str:
    from imdb import IMDb
    # create an instance of the IMDb class
    ia = IMDb()

    # get a movie
    movie = ia.get_movie(film_id)
    return movie['cover url']


staging = imdb_id


func_name = udf(
    lambda val: save_image(val[2:]), # do sth to val
    StringType()
)

urls = staging.limit(10).withColumn('url', func_name(imdb_id.imdb_id))
urls.show()

[Stage 37:>                                                         (0 + 1) / 1]

+---+-------+---------+--------------------+
| id|film_id|  imdb_id|                 url|
+---+-------+---------+--------------------+
|  0|    862|tt0114709|https://m.media-a...|
|  1|   8844|tt0113497|https://m.media-a...|
|  2|  15602|tt0113228|https://m.media-a...|
|  3|  31357|tt0114885|https://m.media-a...|
|  4|  11862|tt0113041|https://m.media-a...|
|  5|    949|tt0113277|https://m.media-a...|
|  6|  11860|tt0114319|https://m.media-a...|
|  7|  45325|tt0112302|https://m.media-a...|
|  8|   9091|tt0114576|https://m.media-a...|
|  9|    710|tt0113189|https://m.media-a...|
+---+-------+---------+--------------------+



                                                                                

In [53]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def save_image(film_id: str) -> str:
    from imdb import IMDb
    # create an instance of the IMDb class
    ia = IMDb()

    # get a movie
    movie = ia.get_movie(film_id)
    return movie['cover url']


staging = imdb_id.limit(3)

num_partitions = 2

partitions = []

for i in range(num_partitions):
    size = staging.count()/num_partitions
    partition = staging.where(f.col("id").between(i * size, (i+1) * size - 1) )
    partitions.append(partition)


save_corresponding_image = udf(
    lambda val: save_image(val[2:]),
    StringType()
)

urls = []

for partition in partitions:
    url = partition.withColumn('url', save_corresponding_image(imdb_id.imdb_id))
    url.show()
    urls.append(url)

main_table = urls[0]

for i in range(len(urls)):
    if i == 0:
        continue
    print(i)
    main_table = main_table.union(urls[i])

main_table.show()

print(main_table.head(num_partitions))

                                                                                

+---+-------+---------+--------------------+
| id|film_id|  imdb_id|                 url|
+---+-------+---------+--------------------+
|  0|    862|tt0114709|https://m.media-a...|
+---+-------+---------+--------------------+



                                                                                

+---+-------+---------+--------------------+
| id|film_id|  imdb_id|                 url|
+---+-------+---------+--------------------+
|  2|  15602|tt0113228|https://m.media-a...|
+---+-------+---------+--------------------+

1


                                                                                

+---+-------+---------+--------------------+
| id|film_id|  imdb_id|                 url|
+---+-------+---------+--------------------+
|  0|    862|tt0114709|https://m.media-a...|
|  2|  15602|tt0113228|https://m.media-a...|
+---+-------+---------+--------------------+



[Stage 272:>                                                        (0 + 1) / 1]

[Row(id=0, film_id='862', imdb_id='tt0114709', url='https://m.media-amazon.com/images/M/MV5BMDU2ZWJlMjktMTRhMy00ZTA5LWEzNDgtYmNmZTEwZTViZWJkXkEyXkFqcGdeQXVyNDQ2OTk4MzI@._V1_SX101_CR0,0,101,150_.jpg'), Row(id=2, film_id='15602', imdb_id='tt0113228', url='https://m.media-amazon.com/images/M/MV5BMjQxM2YyNjMtZjUxYy00OGYyLTg0MmQtNGE2YzNjYmUyZTY1XkEyXkFqcGdeQXVyMTQxNzMzNDI@._V1_SY150_CR0,0,101,150_.jpg')]


                                                                                

In [49]:
ratings = spark\
  .read\
  .option("multiLine", "true")\
  .option("quote", '"')\
  .option("header", "true")\
  .option("escape", '"')\
  .option("wholeFile", True)\
  .csv("ratings.csv", header=True)\

ratings.printSchema()

ratings = ratings\
    .withColumn('id', monotonically_increasing_id())\
    .select("id", "movieId", "rating").where(f.col("movieId") > 5)

ratings.show()

freq = ratings.groupby(f.col("movieId")).agg(f.count('*').alias("freq"))

freq.show()

avg = ratings.groupby(f.col('movieId')).agg(f.mean(f.col("rating")))\
  .withColumn('id', monotonically_increasing_id())\
  .withColumnRenamed('avg(rating)', 'rating')\
  .select('id', 'movieId', 'rating')

avg.show()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

+---+-------+------+
| id|movieId|rating|
+---+-------+------+
|  0|    110|   1.0|
|  1|    147|   4.5|
|  2|    858|   5.0|
|  3|   1221|   5.0|
|  4|   1246|   5.0|
|  5|   1968|   4.0|
|  6|   2762|   4.5|
|  7|   2918|   5.0|
|  8|   2959|   4.0|
|  9|   4226|   4.0|
| 10|   4878|   5.0|
| 11|   5577|   5.0|
| 12|  33794|   4.0|
| 13|  54503|   3.5|
| 14|  58559|   4.0|
| 15|  59315|   5.0|
| 16|  68358|   5.0|
| 17|  69844|   5.0|
| 18|  73017|   5.0|
| 19|  81834|   5.0|
+---+-------+------+
only showing top 20 rows



                                                                                

+-------+-----+
|movieId| freq|
+-------+-----+
|    296|87901|
|  48738| 5758|
|   2136| 3409|
|   2294|12526|
|   6731| 1443|
| 115713| 9086|
|   3210| 9516|
|   7762|  667|
|   1090|18222|
|   2162| 2365|
|  88140| 6996|
|  50802|  566|
|    829| 1961|
|   3606| 1012|
|  89864| 4352|
|    467|  804|
|   4821| 1387|
|   2904|  153|
|    691| 1262|
|   3959| 3245|
+-------+-----+
only showing top 20 rows



[Stage 165:>                                                        (0 + 1) / 1]

+---+-------+------------------+
| id|movieId|            rating|
+---+-------+------------------+
|  0|    296| 4.169975313136369|
|  1|  48738| 3.858284126432789|
|  2|   2136|2.8157817541801116|
|  3|   2294|3.2492016605460643|
|  4|   6731|3.5634095634095635|
|  5| 115713|3.9377613911512217|
|  6|   3210|3.6424443043295502|
|  7|   7762|3.9280359820089954|
|  8|   1090|3.9067061793436504|
|  9|   2162| 2.483720930232558|
| 10|  88140|3.5182246998284734|
| 11|  50802| 2.926678445229682|
| 12|    829| 2.684344722080571|
| 13|   3606| 3.874505928853755|
| 14|  89864| 3.758157169117647|
| 15|    467|3.4284825870646767|
| 16|   4821| 3.192141312184571|
| 17|   2904| 3.584967320261438|
| 18|    691| 3.053090332805071|
| 19|   3959|3.6768875192604007|
+---+-------+------------------+
only showing top 20 rows



                                                                                

In [56]:
keywords = spark\
  .read\
  .option("multiLine", "true")\
  .option("quote", '"')\
  .option("header", "true")\
  .option("escape", '"')\
  .option("wholeFile", True)\
  .csv("keywords.csv", header=True)

keywords.printSchema()
keywords_schema = ArrayType(StructType([StructField('id',IntegerType(),True),
                        StructField('name',StringType(),True)]))


# companies_schema = ArrayType(StructType([StructField('id',IntegerType(),True),
#                         StructField('name',StringType(),True)]))
#
# production_companies = prod_countries = meta\
#     .withColumn('company', f.from_json(f.col('production_companies'), companies_schema))\
#     .withColumn("company_name", f.explode('company.name'))\
#     .withColumn('company_id', monotonically_increasing_id())\
#     .withColumnRenamed('id', 'film_id')\
#     .select('company_id', 'film_id', 'company_name')




keywords = keywords\
    .withColumn('keyword', f.from_json(f.col('keywords'), keywords_schema))\
    .withColumn('keyword', f.explode(f.col("keyword.name"))).withColumnRenamed('id', 'film_id')\
    .withColumn('id', monotonically_increasing_id()).select('id', 'film_id', 'keyword')

keywords.show()

keywords.toPandas().to_csv('out.csv')

root
 |-- id: string (nullable = true)
 |-- keywords: string (nullable = true)

+---+-------+--------------------+
| id|film_id|             keyword|
+---+-------+--------------------+
|  0|    862|            jealousy|
|  1|    862|                 toy|
|  2|    862|                 boy|
|  3|    862|          friendship|
|  4|    862|             friends|
|  5|    862|             rivalry|
|  6|    862|       boy next door|
|  7|    862|             new toy|
|  8|    862|   toy comes to life|
|  9|   8844|          board game|
| 10|   8844|       disappearance|
| 11|   8844|based on children...|
| 12|   8844|            new home|
| 13|   8844|             recluse|
| 14|   8844|        giant insect|
| 15|  15602|             fishing|
| 16|  15602|         best friend|
| 17|  15602|duringcreditsstinger|
| 18|  15602|             old men|
| 19|  31357|      based on novel|
+---+-------+--------------------+
only showing top 20 rows



                                                                                