In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import os
import json
import sqlalchemy
from sqlalchemy import create_engine
import pandas as pd
import psycopg2

In [None]:
spark = SparkSession \
    .builder \
    .getOrCreate()
spark

# Values

In [None]:
WIKIPEDIA_DUMP_XML_PATH="enwiki-latest-abstract.xml"
WIKIPEDIA_DUMP_SCHEMA_PATH="xml_schema.json"
MOVIES_METADATA_CSV_PATH="movies_metadata.csv"
TOP100_PARQUET_PATH="ouput/top100byratio_wiki.parquet"
TOP100_CSV_PATH="ouput/top100byratio_wiki.csv"

In [None]:
POSTGRES_HOST="postgres-db"
POSTGRES_PWD="postgres"
POSTGRES_USER="postgres"
POSTGRES_PORT="5432"
POSTGRES_DB="movies"
POSTGRES_TABLE="top100byRatio"
POSTGRES_CONNECTION_STRING="postgresql://{user}:{password}@{host}:{port}/{db}".format(user=POSTGRES_USER,
                                                                                      password=POSTGRES_PWD,
                                                                                      host=POSTGRES_HOST,
                                                                                      port=POSTGRES_PORT,
                                                                                      db=POSTGRES_DB)
JDBC_CONNECTION_STRING="jdbc:postgresql://{host}:{port}/{db}".format(host=POSTGRES_HOST,
                                                                     port=POSTGRES_PORT,
                                                                     db=POSTGRES_DB)

In [None]:
xml_schema_file=open(WIKIPEDIA_DUMP_SCHEMA_PATH)
WIKIPEDIA_DUMP_XML_SCHEMA=StructType.fromJson(json.load(xml_schema_file))
xml_schema_file.close()
WIKIPEDIA_DUMP_XML_SCHEMA

In [None]:
postgres_connection_properties = {
     "url" : JDBC_CONNECTION_STRING,
     "table" : POSTGRES_TABLE,
     "user" : POSTGRES_USER,
     "password" : POSTGRES_PWD,
     "driver" : 'org.postgresql.Driver',

}

# Download Files

In [None]:
!curl -o enwiki-latest-abstract.xml.gz https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-abstract.xml.gz

In [None]:
!gzip -d enwiki-latest-abstract.xml.gz

In [None]:
os.path.isfile(WIKIPEDIA_DUMP_XML_PATH)

In [None]:
os.path.isfile(MOVIES_METADATA_CSV_PATH)

# Functions

In [None]:
@udf(returnType=StringType())
def extract_names(record):
    if record is None:
        return None
    
    object_list=list(record)
    if not len(object_list):
        return None
    
    try:
        item_list=[json.loads(item) for item in object_list]
        name_list=[item["name"] for item in item_list]
    except:
        return None
    
    return ",".join(name_list)

# Process

### Read film metadata and select fields of our interest

In [None]:
metadata_df=spark.read.csv(MOVIES_METADATA_CSV_PATH, header=True)
metadata_df.show()

##### field 'year' will be obtained from 'release_date'
##### field 'vote_average' will be renamed 'rating' as requested

In [None]:
budget_revenue_view=metadata_df.select("id","imdb_id","original_title","budget","revenue","release_date", "vote_average", "production_companies")
budget_revenue_view.show()

In [None]:
budget_revenue_view.count()

##### Here is a list of records that are being discarded due to their invalid fields 'id', 'imdb_id', 'revenue', 'budget'
##### Here a regex is used to intercept invalid imdb_id records

In [None]:
budget_revenue_view.where(F.regexp_extract(F.col("imdb_id"),r'tt[0-9]{7}',0)=="").show()

##### Removing records with invalid 'id' and 'imdb_id'

In [None]:
clean_IDs=budget_revenue_view.where(F.regexp_extract(F.col("imdb_id"),r'tt[0-9]{7}',0)!="").withColumn("id", F.col("id").cast(IntegerType())).where(F.col("id").isNotNull())
clean_IDs.show()

##### Casting columns to our desired dataType: all invalid values are being replaced by 'null' and invalid values of revenue or budget are copied to the new field 'note'
##### the field 'note' is used to flag any potential data type mismatch that can lead to wrong ratio computation or errors
##### the field 'year' is generated from field 'release_date', that is previously casted to dateType
##### field 'vote_average' has been renamed 'rating'

In [None]:
budget_revenue_view_cast=clean_IDs.withColumn("note",F.when(budget_revenue_view.revenue.cast(IntegerType()).isNotNull()==False,budget_revenue_view.revenue).otherwise(F.lit(None))) \
                                    .withColumn("note",F.when(budget_revenue_view.budget.cast(IntegerType()).isNotNull()==False,budget_revenue_view.budget).otherwise(F.col("note"))) \
                                    .withColumn("budget",budget_revenue_view.budget.cast(IntegerType())) \
                                    .withColumn("revenue",budget_revenue_view.revenue.cast(IntegerType())) \
                                    .withColumn("note",F.when((F.col("revenue")==0),"Budget and/or revenue data missing").otherwise(F.col("note"))) \
                                    .withColumn("note",F.when((F.col("budget")==0),"Budget and/or revenue data missing").otherwise(F.col("note"))) \
                                    .withColumn("release_date",budget_revenue_view.release_date.cast(DateType())) \
                                    .withColumn("year",(F.year("release_date"))) \
                                    .withColumn("vote_average",budget_revenue_view.vote_average.cast(DoubleType())) \
                                    .withColumnRenamed("vote_average","rating") \
                                    .drop("release_date")
budget_revenue_view_cast.show()

##### Computing revenue/budget ratio

In [None]:
budget_revenue_ratio=budget_revenue_view_cast.withColumn("ratio",F.when(budget_revenue_view_cast.note.isNull(),F.format_number(budget_revenue_view_cast.revenue/budget_revenue_view_cast.budget,2)).otherwise(F.lit(0)).cast(DoubleType()))
budget_revenue_ratio.show()

In [None]:
budget_revenue_ratio.count()

##### Ordering records by ratio, descending order. Top100 films

In [None]:
# Select top100 movies by ratio
top100_by_ratio=budget_revenue_ratio.orderBy("ratio", ascending=False) \
.withColumn("clean_title", F.lower(F.col("original_title"))) \
.limit(100)
top100_by_ratio.show()

##### Adjusting production company names

In [None]:
clean_top100_by_ratio=top100_by_ratio.withColumn("company_array", F.from_json(F.col("production_companies"), ArrayType(StringType()))) \
                                               .withColumn("production_companies", extract_names(F.col("company_array"))) \
                                               .drop("company_array")

In [None]:
clean_top100_by_ratio.show()

### Read data from Wikipedia dump

In [None]:
from_wikipedia_df = spark.read.format("com.databricks.spark.xml").option("rootTag", "feed").option("rowTag", "doc").load(WIKIPEDIA_DUMP_XML_PATH, schema=WIKIPEDIA_DUMP_XML_SCHEMA)
from_wikipedia_df.count()

##### Cleaning film titles to prepare for join step
##### Since 'imdb_id' and 'id' fields are not available from the Wikipedia dump, 'title' will be used as primary key for join

In [None]:
clean_from_wikipedia_df=from_wikipedia_df.select("title", "url", "abstract") \
                                         .withColumn("extracted_title", F.regexp_extract(F.col("title"),r'Wikipedia: (.*)',1)) \
                                         .withColumn("clean_title", F.lower(F.col("extracted_title"))) \
                                         .select("clean_title", "url", "abstract")
clean_from_wikipedia_df.show()

### Enrich IMDB data

In [None]:
top100_enriched=clean_top100_by_ratio.join(clean_from_wikipedia_df, 'clean_title',"left")
top100_enriched.show()

### Save enriched data [optional]

In [None]:
top100_enriched.drop("clean_title", "id", "imdb_id", "release_date", "note") \
.coalesce(1) \
.write \
.mode("overwrite") \
.parquet(TOP100_PARQUET_PATH)

In [None]:
top100_enriched.drop("clean_title", "id", "imdb_id", "release_date", "note") \
.coalesce(1) \
.write \
.options(header='True', delimiter='|') \
.mode("overwrite") \
.csv(TOP100_CSV_PATH)

##### Create database 'movies'

In [None]:
conn = psycopg2.connect(
    user=POSTGRES_USER,
    password=POSTGRES_PWD,
    host=POSTGRES_HOST,
    port= POSTGRES_PORT
)

In [None]:
conn.autocommit = True
cursor = conn.cursor()
cursor.execute("SELECT 'CREATE DATABASE {db}' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = ' {db}')".format(db=POSTGRES_DB))
print("Database has been created");
conn.close()

## Write enriched data to Postgres

In [None]:
top100byratio_wiki_df=spark.read.parquet(TOP100_PARQUET_PATH, header=True)

##### Write parquet data to Postgres

In [None]:
top100byratio_wiki_df.write \
                     .mode('overwrite') \
                     .jdbc(
                        url=postgres_connection_properties["url"],
                        table=postgres_connection_properties["table"],
                        properties=postgres_connection_properties
                         )

##### Proof read data from Postgres

In [None]:
top100byratio_postgres = spark.read.jdbc(
    url=postgres_connection_properties["url"],
    table=postgres_connection_properties["table"],
    properties=postgres_connection_properties
)

In [None]:
top100byratio_postgres.show()

# Query Postgres with Pandas (psycopg2)

In [None]:
engine = create_engine(POSTGRES_CONNECTION_STRING)

In [None]:
pd_from_postgres = pd.read_sql("SELECT * FROM {table} WHERE original_title='Star Wars';".format(table=POSTGRES_TABLE), engine)
pd_from_postgres

In [None]:
from_pandas_df = spark.createDataFrame(pd_from_postgres, schema=top100byratio_postgres.schema)
from_pandas_df.show()

# Query Postgres directly

In [None]:
%load_ext sql

In [None]:
%sql postgresql://postgres:postgres@postgres-db:5432/movies

In [None]:
%%sql
SELECT * FROM Top100byRatio WHERE original_title='Star Wars';

# Sources

# https://github.com/databricks/spark-xml
# https://jdbc.postgresql.org/download/
# https://spark.apache.org/docs/2.4.0/sql-data-sources-jdbc.html