<a href="https://colab.research.google.com/github/MShiloni22/DDBMS_Project_A/blob/master/DDBMS_Project_A.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 56.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=101248c3cd8b51cead072e961fe8896f9cdb0f1d497beb71850d4421d2cb7405
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import os
import findspark
import datetime as dt

findspark.init()
from pyspark.sql import SparkSession
def init_spark(app_name: str):
 spark = SparkSession.builder.appName(app_name).getOrCreate()
 sc = spark.sparkContext
 return spark, sc
spark, sc = init_spark('demo')
sc

In [5]:
# working on queries.csv extraction
from pyspark.sql import SparkSession,Row, Column
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
queries_file = '/content/drive/MyDrive/Colab Files/DDBMS//queries.csv'
df = spark.read.csv(queries_file, header='True', inferSchema='True')

print("Loaded queries successfully")

# doing the same process for all columns
column_names = ["genres", "lang", "actors", "director", "cities", "country", 
                "from_realese_date", "production_company"]
for name in column_names:
  temp_name_1 = name + "1"
  temp_name_2 = name + "2"
  # removing irrelevant chars
  df = df.select("*", F.translate(F.col(name), "'[]", "")\
                .alias(temp_name_1))\
  .drop(name)

  # converting arrays strings to arrays of strings
  df = df.select("*", F.split(F.col(temp_name_1),",").alias(temp_name_2)) \
      .drop(temp_name_1)
  df = df.withColumnRenamed(temp_name_2,name)

print("Cleaned and seperated to arrays")

# creating columns for each element in each column contains an array
# for name in column_names:
#   name_size = name + "_size"
#   # get max array's size in the column
#   df = df.withColumn(name_size, F.size(F.col(name)))
#   df_col_max = df.agg({name_size: 'max'})
#   max_col_size = df_col_max.collect()[0][0]

#   for i in range(max_col_size):
#     df = df.withColumn(name + "_" + str(i), F.col(name)[i])

#   df = df.drop(name)
#   df = df.drop(name_size)

df.show()


Loaded queries successfully
Cleaned and seperated to arrays
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|user_id|              genres|                lang|              actors|            director|              cities|             country|from_realese_date|  production_company|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|    981| [Western,  Mystery]|  [English,  Srpski]|                  []|      [Nae Caranfil]|  [Haifa,  Tiberias]|                  []|           [2012]|[Katakuri-ke no K...|
|   3775|  [Action,  Western]|           [English]|                  []|                  []|          [Tel Aviv]|                  []|           [2013]|[Clavius Base,  T...|
|   4095|             [Crime]|[English,  עִבְרִית]|[Kenneth Alton

In [6]:
# working on credits.csv extraction
import re
credits_file = '/content/drive/MyDrive/Colab Files/DDBMS//credits.csv'
df = spark.read.csv(credits_file, header='True', inferSchema='True')

# load the data as you did before,
# just now change the delimiter to get evreything together
credits = spark.read.format("csv")\
.option("delimiter", "\t")\
.option("header","true")\
.option("inferSchema", "true")\
.load("drive/MyDrive/Colab Files/DDBMS//credits.csv")
prog = re.compile('\\[(.*?)\\]')
second_match = F.udf(lambda x: prog.findall(x)[1])
id_extract = F.udf(lambda x: x.split(",")[-1])
credits = credits\
.withColumn("id", id_extract("cast,crew,id"))\
.withColumn("cast", F.regexp_extract(F.col("cast,crew,id"), '\\[(.*?)\\]', 0
))\
.withColumn("crew", F.concat(F.lit("["),second_match("cast,crew,id"), F.lit(
"]")))\
.select("cast", "crew", "id")
df = credits
# df.printSchema()
print("Loaded credits successfully")

# doing the same process for all columns
column_names = ["cast", "crew"]
for name in column_names:
  temp_name_1 = name + "1"
  temp_name_2 = name + "2"
  # removing irrelevant chars
  df = df.select("*", F.translate(F.col(name), "\\{\\[\\]'\\}", "")\
                .alias(temp_name_1))\
  .drop(name)

  # # converting arrays strings to arrays of strings
  df = df.select("*", F.split(F.col(temp_name_1),",").alias(temp_name_2)) \
      .drop(temp_name_1)
  df = df.withColumnRenamed(temp_name_2,name)

print("Cleaned and seperated to arrays")

# for cast column - udf for extracting actors' names only from cast json string
actors_udf = F.udf(lambda arr: [arr[i][7:] for i in range(len(arr)) if i % 8 == 5])
df = df.withColumn('actors', actors_udf(F.col("cast")))\
  .drop("cast")

# for crew column - udf for extracting directors'' names only from crew json string
directors_udf = F.udf(lambda arr: [arr[i+1][7:] for i in range(len(arr))
 if arr[i] == " job: Director"])

df = df.withColumn('directors', directors_udf(F.col("crew")))\
  .drop("crew")

# converting arrays strings to arrays of strings
column_names = ["actors", "directors"]
for name in column_names:
  temp_name_1 = name + "1"
  temp_name_2 = name + "2"
  # removing irrelevant chars
  df = df.select("*", F.translate(F.col(name), "\\{\\[\\]'\\}", "")\
                .alias(temp_name_1))\
  .drop(name)

  # # converting arrays strings to arrays of strings
  df = df.select("*", F.split(F.col(temp_name_1),",").alias(temp_name_2)) \
      .drop(temp_name_1)
  df = df.withColumnRenamed(temp_name_2,name)

# for name in column_names:
#   name_size = name + "_size"
#   # get max array's size in the column
#   df = df.withColumn(name_size, F.size(F.col(name)))
#   df_col_max = df.agg({name_size: 'max'})
#   max_col_size = df_col_max.collect()[0][0]

#   for i in range(max_col_size):
#     df = df.withColumn(name + "_" + str(i), F.col(name)[i])

#   df = df.drop(name)
#   df = df.drop(name_size)

df.show()

Loaded credits successfully
Cleaned and seperated to arrays
+-----+--------------------+--------------------+
|   id|              actors|           directors|
+-----+--------------------+--------------------+
|  862|[Tom Hanks,  Tim ...|     [John Lasseter]|
| 8844|[Robin Williams, ...|      [Joe Johnston]|
|15602|[Walter Matthau, ...|     [Howard Deutch]|
|31357|[Whitney Houston,...|   [Forest Whitaker]|
|11862|[Steve Martin,  D...|     [Charles Shyer]|
|  949|[Al Pacino,  Robe...|      [Michael Mann]|
|11860|[Harrison Ford,  ...|    [Sydney Pollack]|
|45325|[Jonathan Taylor ...|      [Peter Hewitt]|
| 9091|[Jean-Claude Van ...|       [Peter Hyams]|
|  710|[Pierce Brosnan, ...|   [Martin Campbell]|
| 9087|[Michael Douglas,...|        [Rob Reiner]|
|12110|[Leslie Nielsen, ...|        [Mel Brooks]|
|21032|[Kevin Bacon,  Bo...|       [Simon Wells]|
|10858|[Anthony Hopkins,...|      [Oliver Stone]|
| 1408|[Geena Davis,  Ma...|      [Renny Harlin]|
|  524|[Robert De Niro, ...|   [Martin S

In [19]:
# working on movies.csv extraction
movies_file = '/content/drive/MyDrive/Colab Files/DDBMS//movies.csv'
df = spark.read.csv(movies_file, header='True', inferSchema='True')

print("Loaded movies successfully")
# doing the same process for all columns
column_names = ["genres", "production_companies", "production_countries", 
                "spoken_languages", "cities"]
for name in column_names:
  temp_name_1 = name + "1"
  temp_name_2 = name + "2"
  # removing irrelevant chars
  df = df.select("*", F.translate(F.col(name), "\\{\\[\\]'\\}", "")\
                .alias(temp_name_1))\
  .drop(name)

  # # converting arrays strings to arrays of strings
  df = df.select("*", F.split(F.col(temp_name_1),",").alias(temp_name_2)) \
      .drop(temp_name_1)
  df = df.withColumnRenamed(temp_name_2,name)

print("Cleaned and seperated to arrays")

# finished working on cities column, and seperating production_companies
# because it has different structure
column_names = ["genres", "production_countries", 
                "spoken_languages"]

# for each column - udf for extracting names only from json string
name_udf = F.udf(lambda arr: [arr[i][7:] for i in range(len(arr)) if i % 2 == 1])
for c in column_names:
  c_1 = c + "1"
  df = df.withColumn(c_1, name_udf(F.col(c)))\
    .drop(c)
prod_udf = F.udf(lambda arr: [arr[i][6:] for i in range(len(arr)) if i % 2 == 0])
df = df.withColumn("production_companies1", prod_udf(F.col("production_companies")))\
  .drop("production_companies")

# renameing columns names
column_names = ["genres", "production_companies", "production_countries", 
                "spoken_languages"]
for name in column_names:
  current_name = name + "1"
  df = df.withColumnRenamed(current_name,name)
df.show()

Loaded movies successfully
Cleaned and seperated to arrays
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|            overview|        release_date|             revenue|             tagline|               title|              cities|              genres|production_countries|    spoken_languages|production_companies|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     862|Led by Woody, And...|          30/10/1995|           373554033|                null|           Toy Story|[Eilat,  Tel Aviv...|[Animation, Comed...|[United States of...|           [English]|[Pixar Animation ...|
|    8844|When siblings Jud...|          15/12/1995|     