In [3]:
# if you installed Spark on windows, 
# you may need findspark and need to initialize it prior to being able to use pyspark
# Also, you may need to initialize SparkContext yourself.
#import findspark
#findspark.find()
#findspark.init()
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import *

appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than rely on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
#sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = SparkSession.builder.getOrCreate()

#Ingest data from the players CSV into Spark Dataframe. What is dataframe?
players_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("/home/bigdata/Big-Data-Bowl/Data/players.csv")
      )

players_df.printSchema()

root
 |-- nflId: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- PositionAbbr: string (nullable = true)
 |-- EntryYear: integer (nullable = true)
 |-- DraftRound: string (nullable = true)
 |-- DraftNumber: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- College: string (nullable = true)



In [4]:
num_rows = players_df.count()
num_rows

1713

In [5]:
num_columns = len(players_df.columns)
num_columns

10

In [6]:
casted_types_df = (players_df.withColumn("nfl_id", players_df["nflId"].cast("string")).drop("nflId")
              .withColumn("Draft_Round", players_df["DraftRound"].cast("integer")).drop("DraftRound")
              .withColumn("Draft_Number", players_df["DraftNumber"].cast("integer")).drop("DraftNumber")
              .distinct()
           )

casted_types_df.printSchema()

root
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- PositionAbbr: string (nullable = true)
 |-- EntryYear: integer (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- College: string (nullable = true)
 |-- nfl_id: string (nullable = true)
 |-- Draft_Round: integer (nullable = true)
 |-- Draft_Number: integer (nullable = true)



<h2>Process NA/Null Values</h2>

After investigating this dataframe, no custom NULL values were found. There were some null Draft_Round and Draft_Round that were displayed as Null values when converted from string to integer datatype. 

In [7]:
casted_types_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in casted_types_df.columns]).show()

+---------+--------+------------+---------+------+------+-------+------+-----------+------------+
|FirstName|LastName|PositionAbbr|EntryYear|Height|Weight|College|nfl_id|Draft_Round|Draft_Number|
+---------+--------+------------+---------+------+------+-------+------+-----------+------------+
|        0|       0|           0|        0|     0|     0|      0|     0|        528|         528|
+---------+--------+------------+---------+------+------+-------+------+-----------+------------+



DRAFT_ROUND and DRAFT_NUMBER are INTEGERS, so they are imputable.

In [8]:
filled_df = casted_types_df.fillna(0,['Draft_Round','Draft_Number'])


imputer = Imputer (
inputCols=['Draft_Round','Draft_Number'],
outputCols=["{}_imputed".format(c) for c in ['Draft_Round','Draft_Number']]
).setStrategy("mean").setMissingValue(0)

imputed_df = imputer.fit(filled_df).transform(filled_df)

imputed_df_with_dropped_columns = imputed_df.drop('Draft_Round','Draft_Number')

full_df_final = imputed_df_with_dropped_columns.withColumnRenamed("Draft_Round_imputed","Draft_Round").withColumnRenamed("Draft_Number_imputed","Draft_Number")

full_df_final.printSchema()

root
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- PositionAbbr: string (nullable = true)
 |-- EntryYear: integer (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- College: string (nullable = true)
 |-- nfl_id: string (nullable = true)
 |-- Draft_Round: integer (nullable = true)
 |-- Draft_Number: integer (nullable = true)



Now, you need to normalize the data. (You could have normalized before the cleaning as well). We need to create another table that includes the player entry fields and remove the duplicates.

In [9]:
entry_df = full_df_final.select("EntryYear","Draft_Round","Draft_Number").distinct()
entry_df.count()

1201

If one of your key columns contain null, it's a good idea to add a unique identifier column. Notice I'm adding it to the extracted set of columns where the duplicate rows were eleminated. 

In [11]:
entry_df_with_id = entry_df.withColumn("entry_id", monotonically_increasing_id()+1)

In [12]:
entry_df_with_id.printSchema()

root
 |-- EntryYear: integer (nullable = true)
 |-- Draft_Round: integer (nullable = true)
 |-- Draft_Number: integer (nullable = true)
 |-- entry_id: long (nullable = false)



In [13]:
entry_df_with_id.show()

+---------+-----------+------------+--------+
|EntryYear|Draft_Round|Draft_Number|entry_id|
+---------+-----------+------------+--------+
|     2005|          1|          15|       1|
|     2015|          3|          71|       2|
|     2005|          7|         230|       3|
|     2011|          4|         131|       4|
|     2014|          4|         112|       5|
|     2014|          5|         171|       6|
|     2016|          6|         202|       7|
|     2017|          1|           2|       8|
|     2017|          1|           8|       9|
|     2017|          3|          65|      10|
|     2017|          3|          91|      11|
|     2007|          1|          12|      12|
|     2017|          6|         208|      13|
|     2011|          4|         122|      14|
|     2016|          5|         172|      15|
|     2009|          2|          49|      16|
|     2012|          1|          13|      17|
|     2015|          3|          84|      18|
|     2014|          1|           

Now, let's go ahead and add this unique identifier column to our original player dataframe (named full_df_final). We do this by rejoining the dataframes back using all three columns.

In [14]:
joined_df = full_df_final.join(entry_df_with_id,(entry_df_with_id.Draft_Round==full_df_final.Draft_Round)&(entry_df_with_id.Draft_Number==full_df_final.Draft_Number)&(full_df_final.EntryYear==entry_df_with_id.EntryYear))

In [15]:
joined_df.printSchema()

root
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- PositionAbbr: string (nullable = true)
 |-- EntryYear: integer (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- College: string (nullable = true)
 |-- nfl_id: string (nullable = true)
 |-- Draft_Round: integer (nullable = true)
 |-- Draft_Number: integer (nullable = true)
 |-- EntryYear: integer (nullable = true)
 |-- Draft_Round: integer (nullable = true)
 |-- Draft_Number: integer (nullable = true)
 |-- entry_id: long (nullable = false)



Now, it's  time to drop the columns from the players dataframe and leave the unique identifier column (entry_id) that we can use to join the dataframes together.

In [16]:
final_players_df = joined_df.drop("EntryYear","Draft_Round","Draft_Number")

In [17]:
final_players_df.printSchema()

root
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- PositionAbbr: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- College: string (nullable = true)
 |-- nfl_id: string (nullable = true)
 |-- entry_id: long (nullable = false)



Now, it's time to store your dataframes in the database. First, you need to create the tables. Use the printSchema() command for your dataframes to conclude your database table field names. Execute the SQL in PgAdmin. SQL is shown below! Execute the SQL commands <b>one-at-a-time</b>.

In [None]:
CREATE SCHEMA IF NOT EXISTS NFL;

DROP TABLE IF EXISTS NFL.PLAYERS;
DROP TABLE IF EXISTS NFL.ENTRY;
CREATE TABLE IF NOT EXISTS NFL.ENTRY(
EntryYear smallint,
Draft_Round smallint,
Draft_Number smallint,
entry_id integer primary key);

CREATE TABLE IF NOT EXISTS NFL.PLAYERS
(nfl_id varchar(10) primary key,
FirstName varchar(20),
LastName varchar(20),
PositionAbbr varchar(4),
Height varchar(20),
Weight smallint,
College varchar(20),
entry_id integer references NFL.ENTRY); 

Now, write the dataframes to your tables. Notice we start with the tables where there are foreign keys (i.e. the players table).

In [18]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
db_properties={}
db_properties['username']="postgres"
db_properties['password']="postgres"
# make sure to use the correct port number. These 
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['driver']="org.postgresql.Driver"

In [19]:
final_players_df.write.format("jdbc")\
.mode("overwrite")\
.option("url", "jdbc:postgresql://localhost:5432/postgres")\
.option("dbtable", "NFL.PLAYERS")\
.option("user", "postgres")\
.option("password", "postgres")\
.option("Driver", "org.postgresql.Driver")\
.save()

[Stage 38:>                                                         (0 + 1) / 1]                                                                                

In [20]:
entry_df_with_id.write.format("jdbc")\
.mode("overwrite")\
.option("url", "jdbc:postgresql://localhost:5432/postgres")\
.option("dbtable", "NFL.ENTRY")\
.option("user", "postgres")\
.option("password", "postgres")\
.option("Driver", "org.postgresql.Driver")\
.save()

Now, you may read the data back into dataframes and join them again.

In [21]:
players_df_after_db_read = sqlContext.read.format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/postgres")\
    .option("dbtable", "NFL.PLAYERS")\
    .option("user", "postgres")\
    .option("password", "postgres")\
    .option("Driver", "org.postgresql.Driver")\
    .load()

players_df_after_db_read.printSchema()

root
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- PositionAbbr: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- College: string (nullable = true)
 |-- nfl_id: string (nullable = true)
 |-- entry_id: long (nullable = true)



In [22]:
players_df_after_db_read.show(5)

+---------+--------+------------+------+------+---------------+-------+--------+
|FirstName|LastName|PositionAbbr|Height|Weight|        College| nfl_id|entry_id|
+---------+--------+------------+------+------+---------------+-------+--------+
|    Kevin|   Huber|           P|6'01""|   211|     Cincinnati|  71333|    1023|
|Nathaniel|  Solder|           T|6'08""|   325|       Colorado|2495232|     130|
|   Ronald|   Leary|           G|6'03""|   320|        Memphis|2533454|     573|
|   Bennie|   Logan|          DT|6'02""|   315|Louisiana State|2540163|    1031|
|    Chris|   Smith|          DE|6'01""|   266|       Arkansas|2543692|    1192|
+---------+--------+------------+------+------+---------------+-------+--------+
only showing top 5 rows



In [23]:
entry_df_after_db_read = sqlContext.read.format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/postgres")\
    .option("dbtable", "NFL.ENTRY")\
    .option("user", "postgres")\
    .option("password", "postgres")\
    .option("Driver", "org.postgresql.Driver")\
    .load()

entry_df_after_db_read.printSchema()

root
 |-- EntryYear: integer (nullable = true)
 |-- Draft_Round: integer (nullable = true)
 |-- Draft_Number: integer (nullable = true)
 |-- entry_id: long (nullable = true)



In [24]:
entry_df_after_db_read.show(5)

+---------+-----------+------------+--------+
|EntryYear|Draft_Round|Draft_Number|entry_id|
+---------+-----------+------------+--------+
|     2005|          1|          15|       1|
|     2015|          3|          71|       2|
|     2005|          7|         230|       3|
|     2011|          4|         131|       4|
|     2014|          4|         112|       5|
+---------+-----------+------------+--------+
only showing top 5 rows



In [25]:
fully_joined_df_after_db_read = players_df_after_db_read.join(entry_df_after_db_read,players_df_after_db_read.entry_id==entry_df_after_db_read.entry_id)

In [26]:
fully_joined_df_after_db_read.show(5)

+----------+--------+------------+------+------+-----------------+-------+--------+---------+-----------+------------+--------+
| FirstName|LastName|PositionAbbr|Height|Weight|          College| nfl_id|entry_id|EntryYear|Draft_Round|Draft_Number|entry_id|
+----------+--------+------------+------+------+-----------------+-------+--------+---------+-----------+------------+--------+
|    Telvin|   Smith|         OLB|6'03""|   215|    Florida State|2543711|     964|     2014|          5|         144|     964|
|   Anthony|Castonzo|           T|6'07""|   311|   Boston College|2495137|      29|     2011|          1|          22|      29|
|Benardrick|McKinney|         ILB|6'04""|   260|Mississippi State|2552490|      26|     2015|          2|          43|      26|
|     Bilal|  Powell|          RB|5'10""|   204|       Louisville|2495328|     474|     2011|          4|         126|     474|
|   Ricardo|   Allen|          SS|5'09""|   186|           Purdue|2543850|     418|     2014|          5