In [0]:
# Cleaning the data that is fetches from teh API

In [0]:
%sql
SELECT * FROM csv.`dbfs:/public/moviesproject/top250movies.csv`

In [0]:
top250movies=spark.read.csv('dbfs:/public/moviesproject/top250movies.csv',inferSchema=True,header=True)

In [0]:
top250movies = top250movies.withColumn("startYear", top250movies["startYear"].cast("int"))\
    .withColumn("endYear", top250movies["endYear"].cast("int"))\
    .withColumn("releaseDate", top250movies["releaseDate"].cast("date"))\
    .withColumn("budget", top250movies["budget"].cast("int"))\
    .withColumn("grossWorldwide", top250movies["grossWorldwide"].cast("int"))\
    .withColumn("averageRating", top250movies["averageRating"].cast("float"))\
    .withColumn("numVotes", top250movies["numVotes"].cast("int"))\
    .withColumn("runtimeMinutes", top250movies["runtimeMinutes"].cast("int"))

In [0]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,ArrayType
interests_schema=ArrayType(StringType())
externallinks_schema=ArrayType(StringType())
langugages_schema=ArrayType(StringType())
countries_schema=ArrayType(StringType())
genres_schema=ArrayType(StringType())
production_schema=ArrayType(StructType([StructField("name",StringType()),StructField("id",StringType())]))

In [0]:
%python
from pyspark.sql.functions import col, explode_outer, from_json
df = top250movies.withColumn("interests", from_json(col("interests"), interests_schema)) \
    .withColumn("externalLinks", from_json(col("externalLinks"), externallinks_schema)) \
    .withColumn("spokenLanguages", from_json(col("spokenLanguages"), langugages_schema)) \
    .withColumn("countriesOfOrigin", from_json(col("countriesOfOrigin"), countries_schema)) \
    .withColumn("genres", from_json(col("genres"), genres_schema)) \
    .withColumn("productionCompanies", from_json(col("productionCompanies"), production_schema))

In [0]:
%sql
create database if not exists movies_project

In [0]:
%sql
use database movies_project

In [0]:
top250movies=df.select("id","url","primaryTitle","originalTitle","type","description","primaryImage","contentRating","startYear","endYear","releaseDate","filmingLocations","budget","grossWorldwide","isAdult","runtimeMinutes","averageRating","numVotes")
top250movies.write.mode("overwrite").saveAsTable("top250movies")
top250movies
interests_df=df.select(
    col("id").alias("movie_id"),
    explode_outer(col("interests")).alias("interests")
)
interests_df.write.mode("overwrite").saveAsTable("interests")
externallinks_df=df.select(
    col("id").alias("movie_id"),
    explode_outer(col("externalLinks")).alias("externalLinks")
)
externallinks_df.write.mode("overwrite").saveAsTable("externallinks")
langugages_df=df.select(
    col("id").alias("movie_id"),
    explode_outer(col("spokenLanguages")).alias("spokenLanguages")
)
langugages_df.write.mode("overwrite").saveAsTable("langugages")
countries_df=df.select(
    col("id").alias("movie_id"),
    explode_outer(col("countriesOfOrigin")).alias("countriesOfOrigin")
)
countries_df.write.mode("overwrite").saveAsTable("countries")
genres_df=df.select(
    col("id").alias("movie_id"),
    explode_outer(col("genres")).alias("genres")
)
genres_df.write.mode("overwrite").saveAsTable("genres")
productionCompanies_df=df.select(
    col("id").alias("movie_id"),
    explode_outer(col("productionCompanies")).alias("productionCompanies")).select(col("movie_id"),col("productionCompanies.name").alias("name"),col("productionCompanies.id").alias("company_id"))
productionCompanies_df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("productionCompanies")

In [0]:
%sql
describe formatted top250movies