In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.91.38)] [1 InRelease 14.2 kB/88.7                                                                                Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.38)] [1 InRelease 28.6 kB/88.7 0% [2 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.38)]                                                                               Hit:3 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [2 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.38)]0% [2 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.38)]                                                                               Hit:4 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
0% [2 InRelease gpgv 15.9 kB] [Connecti

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-11-05 02:59:41--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.3’


2022-11-05 02:59:42 (1.67 MB/s) - ‘postgresql-42.2.9.jar.3’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TrackingETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [33]:
# Read in data from GCS Buckets
from pyspark import SparkFiles
url="https://storage.googleapis.com/big-data-bowl/week2.csv"
spark.sparkContext.addFile(url)
tracking_data_df = spark.read.csv(SparkFiles.get("week2.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
tracking_data_df.show()

+----------+------+-----+-------+--------------------+------------+----+-------------+-----+-----+----+----+----+------+-----+-----------+
|    gameId|playId|nflId|frameId|                time|jerseyNumber|team|playDirection|    x|    y|   s|   a| dis|     o|  dir|      event|
+----------+------+-----+-------+--------------------+------------+----+-------------+-----+-----+----+----+----+------+-----+-----------+
|2021091600|    65|40031|      1|2021-09-17 00:23:...|          23| NYG|        right|46.32|22.36|0.93|0.83|0.09|271.71|79.32|       None|
|2021091600|    65|40031|      2|2021-09-17 00:23:...|          23| NYG|        right|46.43|22.39|1.07|1.05|0.11|275.92|72.33|       None|
|2021091600|    65|40031|      3|2021-09-17 00:23:...|          23| NYG|        right|46.54|22.44|1.21|1.11|0.12|278.85|67.03|       None|
|2021091600|    65|40031|      4|2021-09-17 00:23:...|          23| NYG|        right|46.65|22.49|1.32|1.14|0.13|282.45|62.63|       None|
|2021091600|    65|40031|  

In [34]:
#Replace NA values in nflId with 0 values
from pyspark.sql.functions import regexp_replace
tracking_data_df.withColumn('nflId', regexp_replace('nflId', 'NA', '0')) \
  .show(truncate=False)

from pyspark.sql.types import IntegerType, FloatType
tracking_data_df = tracking_data_df.withColumn("nflId", tracking_data_df["nflId"].cast(IntegerType()))
tracking_data_df = tracking_data_df.withColumn("jerseyNumber", tracking_data_df["jerseyNumber"].cast(IntegerType()))
tracking_data_df = tracking_data_df.withColumn("o", tracking_data_df["o"].cast(FloatType()))
tracking_data_df = tracking_data_df.withColumn("dir", tracking_data_df["dir"].cast(FloatType()))

tracking_data_df.dtypes

+----------+------+-----+-------+---------------------+------------+----+-------------+-----+-----+----+----+----+------+-----+-----------+
|gameId    |playId|nflId|frameId|time                 |jerseyNumber|team|playDirection|x    |y    |s   |a   |dis |o     |dir  |event      |
+----------+------+-----+-------+---------------------+------------+----+-------------+-----+-----+----+----+----+------+-----+-----------+
|2021091600|65    |40031|1      |2021-09-17 00:23:09.6|23          |NYG |right        |46.32|22.36|0.93|0.83|0.09|271.71|79.32|None       |
|2021091600|65    |40031|2      |2021-09-17 00:23:09.7|23          |NYG |right        |46.43|22.39|1.07|1.05|0.11|275.92|72.33|None       |
|2021091600|65    |40031|3      |2021-09-17 00:23:09.8|23          |NYG |right        |46.54|22.44|1.21|1.11|0.12|278.85|67.03|None       |
|2021091600|65    |40031|4      |2021-09-17 00:23:09.9|23          |NYG |right        |46.65|22.49|1.32|1.14|0.13|282.45|62.63|None       |
|2021091600|65    |4

[('gameId', 'int'),
 ('playId', 'int'),
 ('nflId', 'int'),
 ('frameId', 'int'),
 ('time', 'timestamp'),
 ('jerseyNumber', 'int'),
 ('team', 'string'),
 ('playDirection', 'string'),
 ('x', 'double'),
 ('y', 'double'),
 ('s', 'double'),
 ('a', 'double'),
 ('dis', 'double'),
 ('o', 'float'),
 ('dir', 'float'),
 ('event', 'string')]

Postgres Setup

In [6]:
# Store environment variable
from getpass import getpass
password = getpass('Enter database password')

# Configure settings for Cloud SQL
mode = "append"
jdbc_url="jdbc:postgresql://34.72.136.99:5432/big-data-bowl"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}


Enter database password··········


In [None]:
# Write DataFrame to trackingdata table in Cloud SQL
tracking_data_df.write.jdbc(url=jdbc_url, table='trackingdata', mode=mode, properties=config)