In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
H

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-05-14 10:25:58--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.3’


2022-05-14 10:25:59 (4.90 MB/s) - ‘postgresql-42.2.16.jar.3’ saved [1002883/1002883]



In [3]:
# Start Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Project_Database").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
# Read CSV file from AWS bucket
from pyspark import SparkFiles
url = "https://kwasib-bucket.s3.amazonaws.com/openpowerlifting.csv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("openpowerlifting.csv"), header=True, inferSchema=True)

In [5]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
df.show()

+--------------------+---+-----+---------+----+--------+--------+------------+-------------+--------+--------+--------+--------+------------+--------+--------+--------+--------+------------+-----------+-----------+-----------+-----------+---------------+-------+-----+------+---------+------------+---------+------+---------+----------+----------+-----------+---------+-------------+
|                Name|Sex|Event|Equipment| Age|AgeClass|Division|BodyweightKg|WeightClassKg|Squat1Kg|Squat2Kg|Squat3Kg|Squat4Kg|Best3SquatKg|Bench1Kg|Bench2Kg|Bench3Kg|Bench4Kg|Best3BenchKg|Deadlift1Kg|Deadlift2Kg|Deadlift3Kg|Deadlift4Kg|Best3DeadliftKg|TotalKg|Place| Wilks|McCulloch|Glossbrenner|IPFPoints|Tested|  Country|Federation|      Date|MeetCountry|MeetState|     MeetName|
+--------------------+---+-----+---------+----+--------+--------+------------+-------------+--------+--------+--------+--------+------------+--------+--------+--------+--------+------------+-----------+-----------+-----------+------

In [6]:
# Filter to include only SBD competitors whole placed first
filtered_df= df.filter(df["Event"] =="SBD").filter(df["Place"] =="1")
filtered_df.show()

+----------------+---+-----+---------+----+--------+--------+------------+-------------+--------+--------+--------+--------+------------+--------+--------+--------+--------+------------+-----------+-----------+-----------+-----------+---------------+-------+-----+------+---------+------------+---------+------+---------+----------+----------+-----------+---------+-------------+
|            Name|Sex|Event|Equipment| Age|AgeClass|Division|BodyweightKg|WeightClassKg|Squat1Kg|Squat2Kg|Squat3Kg|Squat4Kg|Best3SquatKg|Bench1Kg|Bench2Kg|Bench3Kg|Bench4Kg|Best3BenchKg|Deadlift1Kg|Deadlift2Kg|Deadlift3Kg|Deadlift4Kg|Best3DeadliftKg|TotalKg|Place| Wilks|McCulloch|Glossbrenner|IPFPoints|Tested|  Country|Federation|      Date|MeetCountry|MeetState|     MeetName|
+----------------+---+-----+---------+----+--------+--------+------------+-------------+--------+--------+--------+--------+------------+--------+--------+--------+--------+------------+-----------+-----------+-----------+-----------+------

In [7]:
# Drop columns that will not be needed for analysis
project_df= filtered_df.drop('Federation','Tested','IPFPoints','Glossbrenner','McCulloch','Equipment','Deadlift4Kg','Bench4Kg','Squat4Kg','Division','Squat1Kg', 'Squat2Kg', 'Squat3Kg','Bench1Kg','Bench2Kg','Bench3Kg','Deadlift1Kg', 'Deadlift2Kg', 'Deadlift3Kg')
project_df.show()

+----------------+---+-----+----+--------+------------+-------------+------------+------------+---------------+-------+-----+------+---------+----------+-----------+---------+-------------+
|            Name|Sex|Event| Age|AgeClass|BodyweightKg|WeightClassKg|Best3SquatKg|Best3BenchKg|Best3DeadliftKg|TotalKg|Place| Wilks|  Country|      Date|MeetCountry|MeetState|     MeetName|
+----------------+---+-----+----+--------+------------+-------------+------------+------------+---------------+-------+-----+------+---------+----------+-----------+---------+-------------+
|      Ash Morgan|  F|  SBD|23.0|   20-23|        59.8|           60|       125.0|        70.0|          150.0|  345.0|    1|385.63|     null|2018-10-27|  Australia|      VIC|Melbourne Cup|
| Briony Williams|  F|  SBD|36.0|   35-39|       108.0|          110|       220.0|       100.0|          200.0|  520.0|    1|424.49|     null|2018-10-27|  Australia|      VIC|Melbourne Cup|
|Brooke Kowalczyk|  F|  SBD|37.0|   35-39|        

In [8]:
# Drop NaN and null values
project_df= project_df.dropna()
project_df.show()

+----------------+---+-----+----+--------+------------+-------------+------------+------------+---------------+-------+-----+------+---------+----------+-----------+---------+--------------------+
|            Name|Sex|Event| Age|AgeClass|BodyweightKg|WeightClassKg|Best3SquatKg|Best3BenchKg|Best3DeadliftKg|TotalKg|Place| Wilks|  Country|      Date|MeetCountry|MeetState|            MeetName|
+----------------+---+-----+----+--------+------------+-------------+------------+------------+---------------+-------+-----+------+---------+----------+-----------+---------+--------------------+
|Dakoda Plumridge|  F|  SBD|27.0|   24-34|        78.6|         82.5|       182.5|       105.0|          205.0|  492.5|    1|455.17|Australia|2018-10-27|  Australia|      VIC|       Melbourne Cup|
|   Helene Faccio|  F|  SBD|50.0|   50-54|        55.2|           56|       137.5|        70.0|          182.5|  390.0|    1|464.08|Australia|2018-10-27|  Australia|      VIC|       Melbourne Cup|
|      Chris Le

In [9]:
# Change Date column  types
from pyspark.sql.types import DateType, IntegerType
from pyspark.sql.functions import col
project_df.select(to_date(col("Date"),"MM-dd-yyyy"))

DataFrame[Date: string, to_date(`Date`, 'MM-dd-yyyy'): date]

In [10]:
# Rename columns
final_project_df = project_df.withColumnRenamed("Name", "competitor_name")\
                    .withColumnRenamed("Sex", "sex")\
                    .withColumnRenamed("Wilks","wilks_score")\
                    .withColumnRenamed("MeetCountry", "meet_country")\
                    .withColumnRenamed("MeetName", "meet_name")\
                    .withColumnRenamed("Date", "meet_date")

final_project_df.show()

+----------------+---+-----+----+--------+------------+-------------+------------+------------+---------------+-------+-----+-----------+---------+----------+------------+---------+--------------------+
| competitor_name|sex|Event| Age|AgeClass|BodyweightKg|WeightClassKg|Best3SquatKg|Best3BenchKg|Best3DeadliftKg|TotalKg|Place|wilks_score|  Country| meet_date|meet_country|MeetState|           meet_name|
+----------------+---+-----+----+--------+------------+-------------+------------+------------+---------------+-------+-----+-----------+---------+----------+------------+---------+--------------------+
|Dakoda Plumridge|  F|  SBD|27.0|   24-34|        78.6|         82.5|       182.5|       105.0|          205.0|  492.5|    1|     455.17|Australia|2018-10-27|   Australia|      VIC|       Melbourne Cup|
|   Helene Faccio|  F|  SBD|50.0|   50-54|        55.2|           56|       137.5|        70.0|          182.5|  390.0|    1|     464.08|Australia|2018-10-27|   Australia|      VIC|       

In [11]:
from pyspark.sql.functions import monotonically_increasing_id 
final_project_df=final_project_df.select("*").withColumn("competitor_id", monotonically_increasing_id()).withColumn("meet_id", monotonically_increasing_id()).withColumn("performance_id", monotonically_increasing_id())
final_project_df.show()

+----------------+---+-----+----+--------+------------+-------------+------------+------------+---------------+-------+-----+-----------+---------+----------+------------+---------+--------------------+-------------+-------+--------------+
| competitor_name|sex|Event| Age|AgeClass|BodyweightKg|WeightClassKg|Best3SquatKg|Best3BenchKg|Best3DeadliftKg|TotalKg|Place|wilks_score|  Country| meet_date|meet_country|MeetState|           meet_name|competitor_id|meet_id|performance_id|
+----------------+---+-----+----+--------+------------+-------------+------------+------------+---------------+-------+-----+-----------+---------+----------+------------+---------+--------------------+-------------+-------+--------------+
|Dakoda Plumridge|  F|  SBD|27.0|   24-34|        78.6|         82.5|       182.5|       105.0|          205.0|  492.5|    1|     455.17|Australia|2018-10-27|   Australia|      VIC|       Melbourne Cup|            0|      0|             0|
|   Helene Faccio|  F|  SBD|50.0|   50-5

In [12]:
# Create Competitor table Dataframe
competitor_df=final_project_df.select("*").withColumn("competitor_id", monotonically_increasing_id())
competitor_df= final_project_df.select(["competitor_id","competitor_name", "sex","country"])
competitor_df.show()

+-------------+----------------+---+---------+
|competitor_id| competitor_name|sex|  country|
+-------------+----------------+---+---------+
|            0|Dakoda Plumridge|  F|Australia|
|            1|   Helene Faccio|  F|Australia|
|            2|      Chris Lepp|  M|Australia|
|            3|      Emad Nayef|  M|Australia|
|            4|   Luke Faulkner|  M|Australia|
|            5|  Warrick Eccles|  M|Australia|
|            6|    Ace Kirkwood|  M|Australia|
|            7|  Billa Hamilton|  F|Australia|
|            8| Yvonne Wagstaff|  F|Australia|
|            9|Nina Markopoulos|  F|Australia|
|           10|    Albert Ozdil|  M|Australia|
|           11|      Dylan Hart|  M|Australia|
|           12|  Rachael Savage|  F|Australia|
|           13|    Tayler Smith|  M|Australia|
|           14|    Jiville Latu|  F|Australia|
|           15| Victoria Hoskin|  F|Australia|
|           16| Ellie Burscough|  F|Australia|
|           17|      Chris Hall|  M|Australia|
|           1

In [13]:
# Create meet table DataFrame
meet_df = final_project_df.select(["meet_id","meet_name","meet_date" ,"meet_country"])
meet_df.show()


+-------+--------------------+----------+------------+
|meet_id|           meet_name| meet_date|meet_country|
+-------+--------------------+----------+------------+
|      0|       Melbourne Cup|2018-10-27|   Australia|
|      1|       Melbourne Cup|2018-10-27|   Australia|
|      2|       Melbourne Cup|2018-10-27|   Australia|
|      3|       Melbourne Cup|2018-10-27|   Australia|
|      4|       Melbourne Cup|2018-10-27|   Australia|
|      5|       Melbourne Cup|2018-10-27|   Australia|
|      6| Sunshine Coast Open|2018-08-25|   Australia|
|      7| Sunshine Coast Open|2018-08-25|   Australia|
|      8|  Victoria Qualifier|2013-04-28|   Australia|
|      9|  Victoria Qualifier|2013-04-28|   Australia|
|     10|  Victoria Qualifier|2013-04-28|   Australia|
|     11|  Victoria Qualifier|2013-04-28|   Australia|
|     12|Gold Coast Nation...|2018-01-21|   Australia|
|     13|  SA Early Qualifier|2018-02-18|   Australia|
|     14|  November Qualifier|2015-11-01|   Australia|
|     15| 

In [14]:
# Create Performance table Dataframe
performance_df= final_project_df.select(["performance_id","competitor_id","meet_id","wilks_score"])
performance_df.show()

+--------------+-------------+-------+-----------+
|performance_id|competitor_id|meet_id|wilks_score|
+--------------+-------------+-------+-----------+
|             0|            0|      0|     455.17|
|             1|            1|      1|     464.08|
|             2|            2|      2|     468.93|
|             3|            3|      3|     460.44|
|             4|            4|      4|     461.74|
|             5|            5|      5|     504.36|
|             6|            6|      6|     438.49|
|             7|            7|      7|     471.75|
|             8|            8|      8|     258.18|
|             9|            9|      9|     346.73|
|            10|           10|     10|     388.05|
|            11|           11|     11|     365.56|
|            12|           12|     12|     455.02|
|            13|           13|     13|     397.72|
|            14|           14|     14|     416.79|
|            15|           15|     15|      310.0|
|            16|           16| 

In [15]:
# Configure settings for RDS
mode = "append"
jdbc_url= "jdbc:postgresql://project-database.c34a9viyb12x.us-east-1.rds.amazonaws.com:5432/FuturePowerLifting"
config = {"user": "postgres",
          "password": "Fantastic",
          "driver": "org.postgresql.Driver"}

In [16]:
# Write competitor_df to table in RDS
competitor_df.write.jdbc(url=jdbc_url, table="competitor", mode= mode, properties= config)

In [None]:
 # Write meet_df to table in RDS
meet_df.write.jdbc(url=jdbc_url, table="meet", mode= mode, properties= config)

In [None]:
 # Write performance_df to table in RDS
performance_df.write.jdbc(url=jdbc_url, table="performance", mode= mode, properties= config)