# Notebook Load Film csvs
Load the two csvs from the Adhoc folder with the right datatypes ready for AI Skills

## Setup

### Parameters

In [64]:
pFilepath = "Files/adhoc"

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 71, Finished, Available, Finished)

### Imports

In [65]:
from pyspark.sql.types import StructType, StructField, ArrayType, BooleanType, DecimalType, IntegerType, LongType, StringType
from notebookutils import mssparkutils

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 72, Finished, Available, Finished)

### Disable v-order for fast load

In [66]:
spark.conf.set('spark.sql.parquet.vorder.enabled', 'false')

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 73, Finished, Available, Finished)

### Reset
Remove exist tables

In [67]:
%%sql
DROP TABLE IF EXISTS films250;
DROP TABLE IF EXISTS filmsgross;

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 75, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

### Schemas

In [68]:
# Create a dictionary holding the table name and accompanying schema
dict_schemas = {
    'films250': StructType([
        StructField("primaryTitle", StringType(), nullable=False),
        StructField("startYear", IntegerType(), nullable=False),
        StructField("runtimeMinutes", IntegerType(), nullable=False),
        StructField("genres", StringType(), nullable=False),
        StructField("numVotes", IntegerType(), nullable=False),
        StructField("averageRating", DecimalType(), nullable=False),
        StructField("r1", IntegerType(), nullable=False),
        StructField("r2", IntegerType(), nullable=False)
    ]),
    'filmsgross': StructType([
        StructField("primaryTitle", StringType(), nullable=False),
        StructField("gross", LongType(), nullable=False)
    ])
}

for table_name, schema in dict_schemas.items():
    print(table_name, '\r\n', schema, '\r\n')

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 76, Finished, Available, Finished)

films250 
 StructType([StructField('primaryTitle', StringType(), False), StructField('startYear', IntegerType(), False), StructField('runtimeMinutes', IntegerType(), False), StructField('genres', StringType(), False), StructField('numVotes', IntegerType(), False), StructField('averageRating', DecimalType(10,0), False), StructField('r1', IntegerType(), False), StructField('r2', IntegerType(), False)]) 

filmsgross 
 StructType([StructField('primaryTitle', StringType(), False), StructField('gross', LongType(), False)]) 



## Main

### Load the files

In [69]:
# Load the table to a dataframe
df_films250 = spark.read.format("csv") \
    .option("header","true") \
    .schema(dict_schemas["films250"]) \
    .load(f"{pFilepath}/films250.csv")

# Save to the database
df_films250.write.format("delta").mode("overwrite").save(f"Tables/films250")

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 77, Finished, Available, Finished)

In [70]:
# Load the table to a dataframe
df_filmsgross = spark.read.format("csv") \
    .option("header","true") \
    .schema(dict_schemas["filmsgross"]) \
    .load(f"{pFilepath}/filmsgross.csv")

# Save to the database
df_filmsgross.write.format("delta").mode("overwrite").save(f"Tables/filmsgross")

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 78, Finished, Available, Finished)

### Loop version

### Get the list of files

In [71]:
# Get list of files to load - can't use wildcards here
files = notebookutils.fs.ls(f"{pFilepath}")

# Filter the list to include only CSV files
csv_files = [file for file in files if file.name.endswith('.csv')]

print(f"Total no. of files: {len(files)} -> Filtered files: {len(csv_files)}")

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 79, Finished, Available, Finished)

Total no. of files: 2 -> Filtered files: 2


!!TODO add schema to this

In [None]:
# Loop through the files and load them (serial)
for file in csv_files:

    # Remove the .csv element from filename to make a clean table name
    clean_tablename = file.name.split('.csv')[0]
    
    print(f"Processing file: {file.name} -> Clean table name: {clean_tablename}")
  
    # Load the table to a dataframe
    df = spark.read.format("csv").option("header","true").load(f"{pFilepath}/{file.name}")
    
    # Save to the database
    df.write.format("delta").mode("overwrite").save(f"Tables/{clean_tablename}")

print(" ")
print("---- LOAD TABLES END ----")

## Queries

### Optimise the tables

In [72]:
%%sql
OPTIMIZE films250 VORDER;
OPTIMIZE filmsgross VORDER;

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 81, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 2 fields>

<Spark SQL result set with 1 rows and 2 fields>

### Count records
Should be 250 for both tables

In [73]:
%%sql
SELECT 'films250' s, COUNT(*) c FROM films250
UNION ALL
SELECT 'filmsgross', COUNT(*) FROM filmsgross;

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 82, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 2 fields>

### Find highly rated films not in the top 250 Gross

In [74]:
%%sql
SELECT primaryTitle
FROM films250 f
EXCEPT
SELECT primaryTitle
FROM filmsgross
ORDER BY 1;

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 83, Finished, Available, Finished)

<Spark SQL result set with 182 rows and 1 fields>

### Find high grossing films not in the top 250 rated

In [75]:
%%sql
SELECT primaryTitle
FROM filmsgross
EXCEPT
SELECT primaryTitle
FROM films250 f
ORDER BY 1;

StatementMeta(, e4b2dd93-8771-4a85-937f-ef88c047227f, 84, Finished, Available, Finished)

<Spark SQL result set with 180 rows and 1 fields>