# SQL, Database operations, and analysis
In this notebook we will explore SQL and database operations like 'join'. Then we will use the datasets as input to a simple analysis.

## In this Noteboook
1. create a context
2. read in data
3. register table with SQL
4. perform SQL queries
5. perform database operations
6. do some analysis

In [1]:
### DO NOT RUN
### DO NOT RUN
### DO NOT RUN

# Initialize the spark environment (takes ~ 1min)
import pyspark
conf = pyspark.SparkConf().setAppName('odl').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
sqlc = pyspark.sql.SQLContext(sc)

### DO NOT RUN
### DO NOT RUN
### DO NOT RUN

In [5]:
import pyspark.sql.functions as sf

## Read in Data

In [10]:
dataPaths = ["bbref/bryce.csv","bbref/javy.csv","bbref/andre.csv"]

In [15]:
dfbryce = sqlc.read.format("csv").option("header","true").option("inferSchema", "true").load(dataPaths[0])
dfjavyb = sqlc.read.format("csv").option("header","true").option("inferSchema", "true").load(dataPaths[1])
dfandre = sqlc.read.format("csv").option("header","true").option("inferSchema", "true").load(dataPaths[2])

In [17]:
dfbryce.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tm: string (nullable = true)
 |-- Lg: string (nullable = true)
 |-- G: integer (nullable = true)
 |-- PA: integer (nullable = true)
 |-- AB: integer (nullable = true)
 |-- R: integer (nullable = true)
 |-- H: integer (nullable = true)
 |-- 2B: integer (nullable = true)
 |-- 3B: integer (nullable = true)
 |-- HR: integer (nullable = true)
 |-- RBI: integer (nullable = true)
 |-- SB: integer (nullable = true)
 |-- CS: integer (nullable = true)
 |-- BB: integer (nullable = true)
 |-- SO: integer (nullable = true)
 |-- BA: double (nullable = true)
 |-- OBP: double (nullable = true)
 |-- SLG: double (nullable = true)
 |-- OPS: double (nullable = true)
 |-- OPS+: integer (nullable = true)
 |-- TB: integer (nullable = true)
 |-- GDP: integer (nullable = true)
 |-- HBP: integer (nullable = true)
 |-- SH: integer (nullable = true)
 |-- SF: integer (nullable = true)
 |-- IBB: integer (nullable = true)
 |-- Pos: st

## Register Tables

In [28]:
dfbryce.registerTempTable("bryce")
dfjavyb.registerTempTable("javyb")
dfandre.registerTempTable("andre")

## SQL queries to produce dataframes

In [37]:
Bhits = sqlc.sql(""" SELECT Year,H,AB FROM bryce """)
Bhits.show()

+----+---+---+
|Year|  H| AB|
+----+---+---+
|2012|144|533|
|2013|116|424|
|2014| 96|352|
|2015|172|521|
|2016|123|506|
|2017|134|420|
|2018|137|550|
+----+---+---+



In [47]:
Bhits.explain(True)

== Parsed Logical Plan ==
'Project ['Year, 'H, 'AB]
+- 'UnresolvedRelation `bryce`

== Analyzed Logical Plan ==
Year: int, H: int, AB: int
Project [Year#888, H#896, AB#894]
+- SubqueryAlias bryce
   +- Relation[Year#888,Age#889,Tm#890,Lg#891,G#892,PA#893,AB#894,R#895,H#896,2B#897,3B#898,HR#899,RBI#900,SB#901,CS#902,BB#903,SO#904,BA#905,OBP#906,SLG#907,OPS#908,OPS+#909,TB#910,GDP#911,... 6 more fields] csv

== Optimized Logical Plan ==
Project [Year#888, H#896, AB#894]
+- Relation[Year#888,Age#889,Tm#890,Lg#891,G#892,PA#893,AB#894,R#895,H#896,2B#897,3B#898,HR#899,RBI#900,SB#901,CS#902,BB#903,SO#904,BA#905,OBP#906,SLG#907,OPS#908,OPS+#909,TB#910,GDP#911,... 6 more fields] csv

== Physical Plan ==
*Project [Year#888, H#896, AB#894]
+- *FileScan csv [Year#888,AB#894,H#896] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/ec2-user/SageMaker/bbref/bryce.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Year:int,AB:int,H:int>


In [57]:
sqlc.sql(""" SELECT Year,H FROM bryce WHERE YEAR = 2015 """).show()
sqlc.sql(""" SELECT Year,H FROM bryce WHERE YEAR = 2015 """).explain(True)

+----+---+
|Year|  H|
+----+---+
|2015|172|
+----+---+

== Parsed Logical Plan ==
'Project ['Year, 'H]
+- 'Filter ('YEAR = 2015)
   +- 'UnresolvedRelation `bryce`

== Analyzed Logical Plan ==
Year: int, H: int
Project [Year#888, H#896]
+- Filter (YEAR#888 = 2015)
   +- SubqueryAlias bryce
      +- Relation[Year#888,Age#889,Tm#890,Lg#891,G#892,PA#893,AB#894,R#895,H#896,2B#897,3B#898,HR#899,RBI#900,SB#901,CS#902,BB#903,SO#904,BA#905,OBP#906,SLG#907,OPS#908,OPS+#909,TB#910,GDP#911,... 6 more fields] csv

== Optimized Logical Plan ==
Project [Year#888, H#896]
+- Filter (isnotnull(YEAR#888) && (YEAR#888 = 2015))
   +- Relation[Year#888,Age#889,Tm#890,Lg#891,G#892,PA#893,AB#894,R#895,H#896,2B#897,3B#898,HR#899,RBI#900,SB#901,CS#902,BB#903,SO#904,BA#905,OBP#906,SLG#907,OPS#908,OPS+#909,TB#910,GDP#911,... 6 more fields] csv

== Physical Plan ==
*Project [Year#888, H#896]
+- *Filter (isnotnull(YEAR#888) && (YEAR#888 = 2015))
   +- *FileScan csv [Year#888,H#896] Batched: false, Format: CSV, Loca

In [58]:
Jhits = sqlc.sql(""" SELECT Year,H,AB FROM javyb """)
Jhits.show()

+----+---+---+
|Year|  H| AB|
+----+---+---+
|2014| 36|213|
|2015| 22| 76|
|2016|115|421|
|2017|128|469|
|2018|176|606|
+----+---+---+



In [59]:
Ahits = sqlc.sql(""" SELECT Year,H,AB FROM ANDRE """)
Ahits.show()

+----+---+---+
|Year|  H| AB|
+----+---+---+
|1976| 20| 85|
|1977|148|525|
|1978|154|609|
|1979|176|639|
|1980|178|577|
|1981|119|394|
|1982|183|608|
|1983|189|633|
|1984|132|533|
|1985|135|529|
|1986|141|496|
|1987|178|621|
|1988|179|591|
|1989|105|416|
|1990|164|529|
|1991|153|563|
|1992|150|542|
|1993|126|461|
|1994| 70|292|
|1995| 58|226|
+----+---+---+
only showing top 20 rows



## Database Operations: joins

In [69]:
df = Bhits.join(Jhits,Bhits.Year==Jhits.Year).join(Ahits,Bhits.Year==Ahits.Year,'outer')
print(df)
df.explain(True)

DataFrame[Year: int, H: int, AB: int, Year: int, H: int, AB: int, Year: int, H: int, AB: int]
== Parsed Logical Plan ==
Join FullOuter, (Year#888 = Year#1034)
:- Join Inner, (Year#888 = Year#961)
:  :- Project [Year#888, H#896, AB#894]
:  :  +- SubqueryAlias bryce
:  :     +- Relation[Year#888,Age#889,Tm#890,Lg#891,G#892,PA#893,AB#894,R#895,H#896,2B#897,3B#898,HR#899,RBI#900,SB#901,CS#902,BB#903,SO#904,BA#905,OBP#906,SLG#907,OPS#908,OPS+#909,TB#910,GDP#911,... 6 more fields] csv
:  +- Project [Year#961, H#969, AB#967]
:     +- SubqueryAlias javyb
:        +- Relation[Year#961,Age#962,Tm#963,Lg#964,G#965,PA#966,AB#967,R#968,H#969,2B#970,3B#971,HR#972,RBI#973,SB#974,CS#975,BB#976,SO#977,BA#978,OBP#979,SLG#980,OPS#981,OPS+#982,TB#983,GDP#984,... 6 more fields] csv
+- Project [Year#1034, H#1042, AB#1040]
   +- SubqueryAlias andre
      +- Relation[Year#1034,Age#1035,Tm#1036,Lg#1037,G#1038,PA#1039,AB#1040,R#1041,H#1042,2B#1043,3B#1044,HR#1045,RBI#1046,SB#1047,CS#1048,BB#1049,SO#1050,BA#1051

In [71]:
df.show()

+----+----+----+----+----+----+----+----+----+
|Year|   H|  AB|Year|   H|  AB|Year|   H|  AB|
+----+----+----+----+----+----+----+----+----+
|null|null|null|null|null|null|1990| 164| 529|
|null|null|null|null|null|null|1977| 148| 525|
|2018| 137| 550|2018| 176| 606|null|null|null|
|2015| 172| 521|2015|  22|  76|null|null|null|
|null|null|null|null|null|null|1978| 154| 609|
|null|null|null|null|null|null|1988| 179| 591|
|null|null|null|null|null|null|1994|  70| 292|
|2014|  96| 352|2014|  36| 213|null|null|null|
|null|null|null|null|null|null|1979| 176| 639|
|null|null|null|null|null|null|1991| 153| 563|
|null|null|null|null|null|null|1982| 183| 608|
|null|null|null|null|null|null|1989| 105| 416|
|null|null|null|null|null|null|1996|  16|  58|
|null|null|null|null|null|null|1985| 135| 529|
|null|null|null|null|null|null|1987| 178| 621|
|2016| 123| 506|2016| 115| 421|null|null|null|
|null|null|null|null|null|null|1995|  58| 226|
|null|null|null|null|null|null|1980| 178| 577|
|null|null|nu

In [73]:
df = Bhits.join(Jhits,Bhits.Year==Jhits.Year)
print(df)
df.show()

DataFrame[Year: int, H: int, AB: int, Year: int, H: int, AB: int]
+----+---+---+----+---+---+
|Year|  H| AB|Year|  H| AB|
+----+---+---+----+---+---+
|2014| 96|352|2014| 36|213|
|2015|172|521|2015| 22| 76|
|2016|123|506|2016|115|421|
|2017|134|420|2017|128|469|
|2018|137|550|2018|176|606|
+----+---+---+----+---+---+



## Analysis: aggregate, calculate

In [79]:
CH = Bhits.groupBy().agg(sf.sum('H').alias('CH'))
CAB = Bhits.groupBy().agg(sf.sum('AB').alias('CAB'))

In [80]:
CH.explain(True)

== Parsed Logical Plan ==
'Aggregate [sum('H) AS CH#1990]
+- Project [Year#888, H#896, AB#894]
   +- SubqueryAlias bryce
      +- Relation[Year#888,Age#889,Tm#890,Lg#891,G#892,PA#893,AB#894,R#895,H#896,2B#897,3B#898,HR#899,RBI#900,SB#901,CS#902,BB#903,SO#904,BA#905,OBP#906,SLG#907,OPS#908,OPS+#909,TB#910,GDP#911,... 6 more fields] csv

== Analyzed Logical Plan ==
CH: bigint
Aggregate [sum(cast(H#896 as bigint)) AS CH#1990L]
+- Project [Year#888, H#896, AB#894]
   +- SubqueryAlias bryce
      +- Relation[Year#888,Age#889,Tm#890,Lg#891,G#892,PA#893,AB#894,R#895,H#896,2B#897,3B#898,HR#899,RBI#900,SB#901,CS#902,BB#903,SO#904,BA#905,OBP#906,SLG#907,OPS#908,OPS+#909,TB#910,GDP#911,... 6 more fields] csv

== Optimized Logical Plan ==
Aggregate [sum(cast(H#896 as bigint)) AS CH#1990L]
+- Project [H#896]
   +- Relation[Year#888,Age#889,Tm#890,Lg#891,G#892,PA#893,AB#894,R#895,H#896,2B#897,3B#898,HR#899,RBI#900,SB#901,CS#902,BB#903,SO#904,BA#905,OBP#906,SLG#907,OPS#908,OPS+#909,TB#910,GDP#911,...

In [82]:
CH.show()
CAB.show()

+---+
| CH|
+---+
|922|
+---+

+----+
| CAB|
+----+
|3306|
+----+



In [88]:
rawAVG = CH.take(1)[0][0]/CAB.take(1)[0][0]
print(rawAVG)

0.278886872353297


In [89]:
round(rawAVG,3)

0.279

# Bryce Harper career batting average: 0.279