## Apache Spark

In [38]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [2]:
import findspark
findspark.init()

In [3]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [4]:
spark

#### Old Implementation

In [12]:
df = spark.read.csv("biostats.csv")

+----+----------+------+--------------+---------------+
| _c0|       _c1|   _c2|           _c3|            _c4|
+----+----------+------+--------------+---------------+
|Name|     "Sex"| "Age"| "Height (in)"| "Weight (lbs)"|
|Alex|       "M"|    41|            74|            170|
|Bert|       "M"|    42|            68|            166|
|Carl|       "M"|    32|            70|            155|
|Dave|       "M"|    39|            72|            167|
|Elly|       "F"|    30|            66|            124|
|Fran|       "F"|    33|            66|            115|
|Gwen|       "F"|    26|            64|            121|
|Hank|       "M"|    30|            71|            158|
|Ivan|       "M"|    53|            72|            175|
|Jake|       "M"|    32|            69|            143|
|Kate|       "F"|    47|            69|            139|
|Luke|       "M"|    34|            72|            163|
|Myra|       "F"|    23|            62|             98|
|Neil|       "M"|    36|            75|         

#### New Implementation

In [21]:
data_schema = [StructField("Name", StringType(), True), StructField("Sex", StringType(), True), StructField("Age", StringType(), True), StructField("Height", StringType(), True), StructField("Weight", StringType(), True)]
final_struct = StructType(fields=data_schema)
df=spark.read.csv("biostats.csv", schema=final_struct)
df.show()

+----+----------+-----+---------+---------+
|Name|       Sex|  Age|   Height|   Weight|
+----+----------+-----+---------+---------+
|Alex|       "M"|   41|       74|      170|
|Bert|       "M"|   42|       68|      166|
|Carl|       "M"|   32|       70|      155|
|Dave|       "M"|   39|       72|      167|
|Elly|       "F"|   30|       66|      124|
|Fran|       "F"|   33|       66|      115|
|Gwen|       "F"|   26|       64|      121|
|Hank|       "M"|   30|       71|      158|
|Ivan|       "M"|   53|       72|      175|
|Jake|       "M"|   32|       69|      143|
|Kate|       "F"|   47|       69|      139|
|Luke|       "M"|   34|       72|      163|
|Myra|       "F"|   23|       62|       98|
|Neil|       "M"|   36|       75|      160|
|Omar|       "M"|   38|       70|      145|
|Page|       "F"|   31|       67|      135|
|Quin|       "M"|   29|       71|      176|
|Ruth|       "F"|   28|       65|      131|
+----+----------+-----+---------+---------+



In [23]:
df.select("Height").show()

+---------+
|   Height|
+---------+
|       74|
|       68|
|       70|
|       72|
|       66|
|       66|
|       64|
|       71|
|       72|
|       69|
|       69|
|       72|
|       62|
|       75|
|       70|
|       67|
|       71|
|       65|
+---------+



### Using SQL

In [25]:
# Registering the dataframe we've created as a SQL temporary view
df.createOrReplaceTempView("tempview")
sql_results = spark.sql("SELECT * FROM tempview")
sql_results.show()

+----+----------+-----+---------+---------+
|Name|       Sex|  Age|   Height|   Weight|
+----+----------+-----+---------+---------+
|Alex|       "M"|   41|       74|      170|
|Bert|       "M"|   42|       68|      166|
|Carl|       "M"|   32|       70|      155|
|Dave|       "M"|   39|       72|      167|
|Fran|       "F"|   33|       66|      115|
|Ivan|       "M"|   53|       72|      175|
|Jake|       "M"|   32|       69|      143|
|Kate|       "F"|   47|       69|      139|
|Luke|       "M"|   34|       72|      163|
|Neil|       "M"|   36|       75|      160|
|Omar|       "M"|   38|       70|      145|
|Page|       "F"|   31|       67|      135|
+----+----------+-----+---------+---------+



### RDD Operations [Resilient Distributed Dataset]

In [47]:
sc = spark.sparkContext
lines = sc.textFile("biostatsmodif.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], sex=p[1], age=int(p[2]), height=int(p[3]), weight=int(p[4])))

schemaPeople = spark.createDataFrame(people)
schemaPeople.show()

+----+---+---+------+------+
|name|sex|age|height|weight|
+----+---+---+------+------+
|Alex|  M| 41|    74|   170|
|Bert|  M| 42|    68|   166|
|Carl|  M| 32|    70|   155|
|Dave|  M| 39|    72|   167|
|Elly|  F| 30|    66|   124|
|Fran|  F| 33|    66|   115|
|Gwen|  F| 26|    64|   121|
|Hank|  M| 30|    71|   158|
|Ivan|  M| 53|    72|   175|
|Jake|  M| 32|    69|   143|
|Kate|  F| 47|    69|   139|
|Luke|  M| 34|    72|   163|
|Myra|  F| 23|    62|    98|
|Neil|  M| 36|    75|   160|
|Omar|  M| 38|    70|   145|
|Page|  F| 31|    67|   135|
|Quin|  M| 29|    71|   176|
|Ruth|  F| 28|    65|   131|
+----+---+---+------+------+

