# ALY 6110: Data Management and Big Data CRN Number: 70362
# Group Delta

### | Rushikesh Sawant | 

### OBJECTIVE OF CODE 
- Establish connection with snowflake database

- Import Data into DataBricks

- Data Cleaning

- Merge Tables as per Requirement

- Export / Create a new table within Snowflake

↓↓↓ Snowflake Connection Parameters ↓↓↓

In [None]:
options = {
  "sfUrl": "cca21018.east-us-2.azure.snowflakecomputing.com",
  "sfUser": "",
  "sfPassword": "",
  "sfDatabase": "DATABRICK",
  "sfSchema": "PUBLIC",
  "sfWarehouse": "COMPUTE_WH"
}

In [None]:
df_rating = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query", "select * from rating;") \
  .load()

display(df_rating)

TCONST,AVERAGERATING,NUMVOTES
tt0000001,6,2007
tt0000002,6,270
tt0000003,7,1914
tt0000004,6,178
tt0000005,6,2695
tt0000006,5,182
tt0000007,5,842
tt0000008,5,2154
tt0000009,5,207
tt0000010,7,7354


In [None]:
df_title = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query", "select * from title;") \
  .load()

display(df_title)

TCONST,ORIGINALTITLE,STARTYEAR,GENRES
tt0000001,Carmencita,1894.0,"Documentary,Short"
tt0000002,Le clown et ses chiens,1892.0,"Animation,Short"
tt0000003,Pauvre Pierrot,1892.0,"Animation,Comedy,Romance"
tt0000004,Un bon bock,1892.0,"Animation,Short"
tt0000005,Blacksmith Scene,1893.0,"Comedy,Short"
tt0000006,Chinese Opium Den,1894.0,Short
tt0000007,Corbett and Courtney Before the Kinetograph,1894.0,"Short,Sport"
tt0000008,Edison Kinetoscopic Record of a Sneeze,1894.0,"Documentary,Short"
tt0000009,Miss Jerry,1894.0,Romance
tt0000010,La sortie de l'usine LumiÃ¨re Ã Lyon,1895.0,"Documentary,Short"


In [None]:
df_title_detail = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query", "select * from title_detail;") \
  .load()

display(df_title_detail)

TITLEID,TITLE,REGION,LANGUAGE
tt0000001,Карменсіта,UA,
tt0000001,Carmencita,DE,
tt0000001,Carmencita - spanyol tánc,HU,
tt0000001,Καρμενσίτα,GR,
tt0000001,Карменсита,RU,
tt0000001,Carmencita,US,
tt0000001,Carmencita,,
tt0000001,カルメンチータ,JP,ja
tt0000002,Le clown et ses chiens,,
tt0000002,Le clown et ses chiens,FR,


In [None]:
df_crew_no = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query", "select * from crew_no;") \
  .load()

display(df_crew_no)

TCONST,DIRECTORS,WRITERS
tt0000001,nm0005690,
tt0000002,nm0721526,
tt0000003,nm0721526,
tt0000004,nm0721526,
tt0000005,nm0005690,
tt0000006,nm0005690,
tt0000007,"nm0005690,nm0374658",
tt0000008,nm0005690,
tt0000009,nm0085156,nm0085156
tt0000010,nm0525910,


In [None]:
df_crew_detail = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("query", "select * from crew_detail;") \
  .load()

display(df_crew_detail)

NCONST,PRIMARYNAME,BIRTHYEAR,DEATHYEAR
nm0000001,Fred Astaire,1899.0,1987.0
nm0000002,Lauren Bacall,1924.0,2014.0
nm0000003,Brigitte Bardot,1934.0,
nm0000004,John Belushi,1949.0,1982.0
nm0000005,Ingmar Bergman,1918.0,2007.0
nm0000006,Ingrid Bergman,1915.0,1982.0
nm0000007,Humphrey Bogart,1899.0,1957.0
nm0000008,Marlon Brando,1924.0,2004.0
nm0000009,Richard Burton,1925.0,1984.0
nm0000010,James Cagney,1899.0,1986.0


### Data Cleaning

In [None]:
from pyspark.sql.functions import col
#Cleaning df_rating Table
df_rating = df_rating.filter(col('TCONST').isNotNull())
df_rating = df_rating.na.fill(0, subset=['AVERAGERATING'])
df_rating = df_rating.na.fill(0, subset=['NUMVOTES'])

#Cleaning df_Title Table
df_title = df_title.filter(col('TCONST').isNotNull())
df_title = df_title.na.fill('Unknown', subset=['ORIGINALTITLE'])
df_title = df_title.na.fill(0, subset=['STARTYEAR'])
df_title = df_title.na.fill('Unknown', subset=['GENRES'])

#Cleaning df_Title_detail Table
df_title_detail = df_title_detail.filter(col('TITLEID').isNotNull())
df_title_detail = df_title_detail.na.fill('Unknown', subset=['TITLE'])
df_title_detail = df_title_detail.na.fill('Unknown', subset=['REGION'])
df_title_detail = df_title_detail.na.fill('Unknown', subset=['LANGUAGE'])

#Cleaning df_Crew_no Table
df_crew_no = df_crew_no.filter(col('TCONST').isNotNull())
df_crew_no = df_crew_no.na.fill('Unknown', subset=['DIRECTORS'])
df_crew_no = df_crew_no.na.fill('Unknown', subset=['WRITERS'])
df_crew_no = df_crew_no.drop('DIRECTORS')

#Cleaning df_Crew_detail Table
df_crew_detail = df_crew_detail.filter(col('NCONST').isNotNull())
df_crew_detail = df_crew_detail.na.fill('Unknown', subset=['PRIMARYNAME'])
df_crew_detail = df_crew_detail.na.fill(0, subset=['BIRTHYEAR'])
df_crew_detail = df_crew_detail.na.fill(0, subset=['DEATHYEAR'])

In [None]:
df_final = df_rating.join(df_title, "TCONST")
df_final = df_final.join(df_title_detail, col("TCONST") == col("TITLEID"))
df_final = df_final.join(df_crew_no, "TCONST")

display(df_final.head())

Row(TCONST='tt0000001', AVERAGERATING=Decimal('6'), NUMVOTES=Decimal('2007'), ORIGINALTITLE='Carmencita', STARTYEAR=Decimal('1894'), GENRES='Documentary,Short', TITLEID='tt0000001', TITLE='Карменсіта', REGION='UA', LANGUAGE='Unknown', WRITERS='Unknown')

In [None]:
# Replace 'Final' with your desired table name in Snowflake
table_name = "Final_Dataset"

# Write DataFrame to Snowflake
df_final.write \
    .format("snowflake") \
    .options(**options) \
    .option("dbtable", "Final_Dataset") \
    .mode("append") \
    .save()

# Stop Spark session
spark.stop()