In [0]:
## Spark Basic P.1

In [0]:
from pyspark.sql import SparkSession

In [0]:
# May take a little while on a local computer
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [0]:
# We'll discuss how to read other options later.
# This dataset is from Spark's examples

# Might be a little slow locally
df = sqlContext.sql("SELECT * FROM people_json")
#df = spark.read.json('people_json.json')

In [0]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [0]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [0]:
df.columns

Out[36]: ['age', 'name']

In [0]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [0]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType

In [0]:
## Define a proper schema and modify column , 

In [0]:
schema = StructType([ StructField("age",  IntegerType(), True), StructField("name", StringType(), True)])

In [0]:
df = sqlContext.createDataFrame(df.collect()  , schema)

In [0]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [0]:
## Spark Basic P.2

In [0]:
df['age']

Out[44]: Column<'age'>

In [0]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [0]:
type(df["age"])

Out[46]: pyspark.sql.column.Column

In [0]:
df.head(2)

Out[51]: [Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [0]:
df.select(["age","name"]).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [0]:
df.withColumn("double_age", df["age"]*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [0]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [0]:
df= df.withColumnRenamed("age", "ny_new_funk_age")

In [0]:
df.show()

+---------------+-------+
|ny_new_funk_age|   name|
+---------------+-------+
|           null|Michael|
|             30|   Andy|
|             19| Justin|
+---------------+-------+



In [0]:
df.createOrReplaceTempView('people')

In [0]:
results= spark.sql("select * from people")

In [0]:
results.show()

+---------------+-------+
|ny_new_funk_age|   name|
+---------------+-------+
|           null|Michael|
|             30|   Andy|
|             19| Justin|
+---------------+-------+



In [0]:
## Spark Basic Operation P.3

In [0]:
from pyspark.sql import SparkSession

In [0]:
# May take awhile locally
spark = SparkSession.builder.appName("Operations").getOrCreate()

In [0]:
# Let Spark know about the header and infer the Schema types!
df = spark.read.csv('/FileStore/tables/appl_stock.csv',inferSchema=True,header=True)

In [0]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [0]:
# Using SQL
df.filter("Close<500").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [0]:
# Using SQL with .select()
df.filter("Close<500").select(['Open','Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [0]:
df.filter(df["Close"] < 200).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|        207.499996|            197.16|            197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|        205.500004|        198.699995|        199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|        202.199995|        190.250002|        192.060003|311488100|         24.883208|
|2010-02-01 00:00:00|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.3

In [0]:
# Make sure to add in the parenthesis separating the statements!
## & and 
df.filter( (df["Close"] < 200) & (df['Open'] > 200) ).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [0]:
# Make sure to add in the parenthesis separating the statements!
## | or 
df.filter( (df["Close"] < 200) | (df['Open'] > 200) ).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [0]:
# Make sure to add in the parenthesis separating the statements!
## ~ not 
df.filter( (df["Close"] < 200) & ~(df['Open'] < 200) ).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [0]:
## equivalent of iloc 

df.collect()[0]["Volume"] 

Out[99]: 123432400

In [0]:
### Convert a Spark Df to Pandas Df 


df_pandas = df.toPandas()
print(df_pandas)

           Date        Open        High         Low       Close     Volume  \
0    2010-01-04  213.429998  214.499996  212.380001  214.009998  123432400   
1    2010-01-05  214.599998  215.589994  213.249994  214.379993  150476200   
2    2010-01-06  214.379993  215.230000  210.750004  210.969995  138040000   
3    2010-01-07  211.750000  212.000006  209.050005  210.580000  119282800   
4    2010-01-08  210.299994  212.000006  209.060005  211.980005  111902700   
...         ...         ...         ...         ...         ...        ...   
1757 2016-12-23  115.589996  116.519997  115.589996  116.519997   14249500   
1758 2016-12-27  116.519997  117.800003  116.489998  117.260002   18296900   
1759 2016-12-28  117.519997  118.019997  116.199997  116.760002   20905900   
1760 2016-12-29  116.449997  117.110001  116.400002  116.730003   15039500   
1761 2016-12-30  116.650002  117.199997  115.430000  115.820000   30586300   

       Adj Close  
0      27.727039  
1      27.774976  
2     

In [0]:
df_pandas.iloc[:,:]


Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2010-01-04,213.429998,214.499996,212.380001,214.009998,123432400,27.727039
1,2010-01-05,214.599998,215.589994,213.249994,214.379993,150476200,27.774976
2,2010-01-06,214.379993,215.230000,210.750004,210.969995,138040000,27.333178
3,2010-01-07,211.750000,212.000006,209.050005,210.580000,119282800,27.282650
4,2010-01-08,210.299994,212.000006,209.060005,211.980005,111902700,27.464034
...,...,...,...,...,...,...,...
1757,2016-12-23,115.589996,116.519997,115.589996,116.519997,14249500,116.016995
1758,2016-12-27,116.519997,117.800003,116.489998,117.260002,18296900,116.753806
1759,2016-12-28,117.519997,118.019997,116.199997,116.760002,20905900,116.255965
1760,2016-12-29,116.449997,117.110001,116.400002,116.730003,15039500,116.226096


In [0]:
### part 3 - Group By and Aggregation 

In [0]:
from pyspark.sql import SparkSession

In [0]:
# May take a little while on a local computer
spark = SparkSession.builder.appName("groupbyagg").getOrCreate()

In [0]:
df = spark.read.csv('/FileStore/tables/sales_info.csv',inferSchema=True,header=True)

In [0]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [0]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [0]:
df.groupBy("Company")

Out[122]: <pyspark.sql.group.GroupedData at 0x7f7bfdc84f40>

In [0]:
df.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [0]:
df.groupBy("company").count().show()

+-------+-----+
|company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [0]:
df.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [0]:
df.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [0]:
from pyspark.sql.functions import countDistinct , avg , stddev

In [0]:
df.select(stddev("Sales")).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [0]:
df.select(countDistinct("Sales")).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [0]:
df.select(countDistinct("Sales").alias("Distinct Sales")).show()

+--------------+
|Distinct Sales|
+--------------+
|            11|
+--------------+



In [0]:
from pyspark.sql.functions import format_number

In [0]:
sales_std = df.select(stddev("Sales").alias('std'))

In [0]:
sales_std.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [0]:
sales_std.select(format_number('std',2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|               250.09|
+---------------------+



In [0]:
# OrderBy
# Ascending
df.orderBy("Sales").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [0]:
# Descending call off the column itself.
df.orderBy(df["Sales"].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [0]:
##Spark Missing Data P.4

In [0]:
from pyspark.sql import SparkSession
# May take a little while on a local computer
spark = SparkSession.builder.appName("missingdata").getOrCreate()

In [0]:
#/FileStore/tables/ContainsNull.csv

df = spark.read.csv("/FileStore/tables/ContainsNull.csv",header=True,inferSchema=True)

In [0]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

