### Step 1 : Define JDBC and connection properties for Az PostgreSQL DB 

In [0]:
#Define Java Database Connectivity (JDBC-industry-standard spec for accessing database management systems) URL for azure postgresql
jdbc_url = "jdbc:postgresql://nhlfinaldbserver.postgres.database.azure.com:5432/postgres"

# Define connection properties
connection_properties = {
    "user": "nhladmin",
    "password": "***",
    "driver": "org.postgresql.Driver"
}



In [0]:
# Test the connection with a basic query
try:
    # Use a simple query that doesn't rely on any existing tables
    query = "(SELECT 1) AS test_query"
    
    # Load the result of the query into a DataFrame
    testing123_df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
    
    # Show the result to verify the connection
    testing123_df.show()
    
    print("Connection to PostgreSQL database successful!")
    
except Exception as e:
    print(f"Failed to connect to PostgreSQL: {str(e)}")

+--------+
|?column?|
+--------+
|       1|
+--------+

Connection to PostgreSQL database successful!


### Step 2: Define dictionary of folder and tables

In [0]:
# Define folder paths and corresponding table names
folders_and_tables = {
    "gold/game_goalie_stats": "game_goalie_stats",
    "gold/player_info": "player_info",
    "gold/game_skater_stats": "game_skater_stats",
    "gold/team_info": "team_info",
    "gold/game_plays_xy": "game_plays"
}

### Step 3: Loop through dictionary to upload parquet to spark dataframe and then to postgresql 

In [0]:
# Read Delta Parquet Files from Each Folder
for folder, table_name in folders_and_tables.items():
    folder_path = f"/mnt/nhl-finalproject/{folder}"  
    
        
    # Load all Delta Parquet files in the folder into a single DataFrame
    try:
        df = spark.read.format("delta").load(folder_path)
        num_rows =df.count()
        # Check if DataFrame is not empty before writing
        if num_rows == 0:  # Check if the DataFrame is empty
            print(f"No data found in {folder}. Skipping write to {table_name}.")
            continue

               
        if num_rows > 100000:  
            batched_df = df.repartition(5) # repartition if more than 100000
        
        else:
            batched_df = df  # Keep it as is if it's small

        # Write DataFrames to corresponding Azure PostgreSQL Tables
        batched_df.write \
           .jdbc(url=jdbc_url, table=table_name, mode='overwrite', properties=connection_properties)
        
        print(f"Successfully written data from {folder} to {table_name} in PostgreSQL.")
        
    except Exception as e:
        print(f"Failed to write data from {folder} to {table_name}: {str(e)}")

print("Data loading completed!")


Successfully written data from gold/game_goalie_stats to game_goalie_stats in PostgreSQL.
Successfully written data from gold/player_info to player_info in PostgreSQL.
Successfully written data from gold/game_skater_stats to game_skater_stats in PostgreSQL.
Successfully written data from gold/team_info to team_info in PostgreSQL.
Successfully written data from gold/game_plays_xy to game_plays in PostgreSQL.
Data loading completed!
