In [2]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

import seaborn as sn
import yellowbrick

### Data Import and Exploration
    import types from pyspark.sql.type:
    StructField('col', DataType, nullable=True)
    StructType(fields=StructField)
    spark.read.format('file', schema=StructType)
    spark.read.format(): import data (inferSchema=True) only for csv
    df.show(): visualise data frame
    df.columns(): check columns
    df.count(): check number of rows
    df.describe(['col']).show(): summary of col
    df.printSchema(): check data type

In [80]:
# import without schema
df = spark.read.csv('dataset.csv', header=True)

In [212]:
from pyspark.sql.types import (StructField, StructType,
                               TimestampType, IntegerType, FloatType)
# define data schema (or use inferSchema =True when loading dataframe)
data_schema = [StructField('instant', IntegerType(), True),
               StructField('dteday', TimestampType(), True),
               StructField('season', IntegerType(), True),
               StructField('yr', IntegerType(), True),
               StructField('mnth', IntegerType(), True),
               StructField('hr', IntegerType(), True),
               StructField('holiday', IntegerType(), True),
               StructField('weekday', IntegerType(), True),
               StructField('workingday', IntegerType(), True),
               StructField('weathersit', IntegerType(), True),
               StructField('temp', FloatType(), True),
               StructField('atemp', FloatType(), True),
               StructField('hum', FloatType(), True),
               StructField('windspeed', FloatType(), True),
               StructField('casual', IntegerType(), True),
               StructField('registered', IntegerType(), True),
               StructField('cnt', IntegerType(), True)]

final_struct = StructType(fields = data_schema)
#import with self-defined schema
df = spark.read.csv('dataset.csv', schema=final_struct, header=True)

In [268]:
# import with inferred schema automatically, only for csv
df = spark.read.csv('dataset.csv', header=True, inferSchema=True)

In [172]:
df.show(3)
df.head(1)

+-------+--------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|  dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+--------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|   null|2011/1/1|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|      0.0|     3|        13| 16|
|      2|2011/1/1|     1|  0|   1|  1|      0|      6|         0|         1|0.22|  null| 0.8|      0.0|     8|        32| 40|
|      3|2011/1/1|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727|null|      0.0|     5|        27| 32|
+-------+--------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
only showing top 3 rows



[Row(instant=None, dteday='2011/1/1', season=1, yr=0, mnth=1, hr=0, holiday=0, weekday=6, workingday=0, weathersit=1, temp=0.24, atemp=0.2879, hum=0.81, windspeed=0.0, casual=3, registered=13, cnt=16)]

In [170]:
df.count()

17379

In [169]:
df.columns

['instant',
 'dteday',
 'season',
 'yr',
 'mnth',
 'hr',
 'holiday',
 'weekday',
 'workingday',
 'weathersit',
 'temp',
 'atemp',
 'hum',
 'windspeed',
 'casual',
 'registered',
 'cnt']

In [44]:
# data summary
df.describe(['instant', 'temp', 'atemp', 'hum', 'windspeed', 'cnt']).show()

+-------+-----------------+-------------------+-------------------+-------------------+-------------------+------------------+
|summary|          instant|               temp|              atemp|                hum|          windspeed|               cnt|
+-------+-----------------+-------------------+-------------------+-------------------+-------------------+------------------+
|  count|            13411|              16879|              16655|              16664|              17379|             17379|
|   mean|8742.028782342853|0.49789324210464186|0.47718728662814347| 0.6233845399098004|0.19009760539747023|189.46308763450142|
| stddev|5057.902777166932| 0.1919856962831364|0.17226178912704154|0.19323537228920643|0.12234022926320468| 181.3875990918646|
|    min|                2|               0.02|                0.0|                0.0|                0.0|                 1|
|    max|            17379|                1.0|                1.0|                1.0|             0.8507|    

In [213]:
# data type
df.printSchema()
df.dtypes

root
 |-- instant: integer (nullable = true)
 |-- dteday: timestamp (nullable = true)
 |-- season: integer (nullable = true)
 |-- yr: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- hr: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- temp: float (nullable = true)
 |-- atemp: float (nullable = true)
 |-- hum: float (nullable = true)
 |-- windspeed: float (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- cnt: integer (nullable = true)



[('instant', 'int'),
 ('dteday', 'timestamp'),
 ('season', 'int'),
 ('yr', 'int'),
 ('mnth', 'int'),
 ('hr', 'int'),
 ('holiday', 'int'),
 ('weekday', 'int'),
 ('workingday', 'int'),
 ('weathersit', 'int'),
 ('temp', 'float'),
 ('atemp', 'float'),
 ('hum', 'float'),
 ('windspeed', 'float'),
 ('casual', 'int'),
 ('registered', 'int'),
 ('cnt', 'int')]

### Data Operation
    df.col / df['col'] / df.select('col'): subset
    df.withColumn('col', fun): add column
    df.withColumnRenamed('col', 'new_col'): rename col
    df.filter(condition): filter rows
    .collect()
    df.groupBy('col'): group data by values in 'col'
    df.groupBy('col1').fun('col2'): group by 'col1' then do fun on 'col2'
    df.agg({'col':"fun"}): do fun on values in 'col' (not grouping)
    df.ordrBy('col'): sort data by values in 'col' (same as df.sort())
    df.sort('col', ascending=False) or df.sort(df.col.desc())

In [73]:
# Dataframe subset
df.select('instant').describe().show()
df['temp', 'atemp'].describe().show()
df.describe(['hum', 'windspeed']).show()

+-------+-----------------+
|summary|          instant|
+-------+-----------------+
|  count|            13411|
|   mean|8742.028782342853|
| stddev|5057.902777166932|
|    min|                2|
|    max|            17379|
+-------+-----------------+

+-------+-------------------+-------------------+
|summary|               temp|              atemp|
+-------+-------------------+-------------------+
|  count|              16879|              16655|
|   mean|0.49789324210464186|0.47718728662814347|
| stddev| 0.1919856962831364|0.17226178912704154|
|    min|               0.02|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+

+-------+-------------------+-------------------+
|summary|                hum|          windspeed|
+-------+-------------------+-------------------+
|  count|              16664|              17379|
|   mean| 0.6233845399098004|0.19009760539747023|
| stddev|0.19323537228920643|0.122340229263204

In [55]:
# Col Rename
datadf.withColumnRenamed('yr', 'year').show(2)

+-------+--------+------+----+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|  dteday|season|year|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+--------+------+----+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|   null|2011/1/1|     1|   0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|      0.0|     3|        13| 16|
|      2|2011/1/1|     1|   0|   1|  1|      0|      6|         0|         1|0.22|  null| 0.8|      0.0|     8|        32| 40|
|      3|2011/1/1|     1|   0|   1|  2|      0|      6|         0|         1|0.22|0.2727|null|      0.0|     5|        27| 32|
|      4|2011/1/1|     1|   0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0|     3|        10| 13|
|      5|2011/1/1|     1|   0|   1|  4|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0|     0|

In [187]:
# data frame subsetting
df.select('cnt', 'yr')
df.select(df.cnt, df.yr)
df['cnt', 'yr']

DataFrame[cnt: int, yr: int]

In [115]:
# introduce SQL
df.createOrReplaceTempView('bikesharing')
results = spark.sql('SELECT cnt FROM bikesharing')
results.show(3)

+---+
|cnt|
+---+
| 16|
| 40|
| 32|
+---+
only showing top 3 rows



In [180]:
# dataframe methods
print(df.filter(df.temp > 0).count())
print(df.na.drop(subset='temp').count())

16879
16879


In [208]:
# .collect() returns a list (with [])
# .asDict() turn to a Dict (with {})
df.select('cnt', 'yr').collect()[0].asDict()

{'cnt': 16, 'yr': 0}

In [280]:
# group data by year and return mean of count
df.groupBy('yr').mean('cnt').show()
# group data by year and return mean of each column, format the number
df.groupBy('yr').mean().select('yr', format_number('avg(cnt)',2)).show()

+---+------------------+
| yr|          avg(cnt)|
+---+------------------+
|  1| 234.6663613464621|
|  0|143.79444765760556|
+---+------------------+

+---+--------------------------+
| yr|format_number(avg(cnt), 2)|
+---+--------------------------+
|  1|                    234.67|
|  0|                    143.79|
+---+--------------------------+



In [281]:
# not grouping, on entire data
df.agg({'cnt':'mean'}).show()

+------------------+
|          avg(cnt)|
+------------------+
|189.46308763450142|
+------------------+



In [294]:
# group data by season, format number for easy check
df_season = df.groupBy('season').mean('cnt').withColumnRenamed(
    'avg(cnt)', 'average of count')
# ascending default
df_season.sort('season').show()
# descending
df_season.sort('season', ascending=False).show()
df_season.orderBy(df_season.season.desc()).show()

+------+------------------+
|season|  average of count|
+------+------------------+
|     1|111.11456859971712|
|     2|208.34406894987526|
|     3|236.01623665480426|
|     4|198.86885633270322|
+------+------------------+

+------+------------------+
|season|  average of count|
+------+------------------+
|     4|198.86885633270322|
|     3|236.01623665480426|
|     2|208.34406894987526|
|     1|111.11456859971712|
+------+------------------+

+------+------------------+
|season|  average of count|
+------+------------------+
|     4|198.86885633270322|
|     3|236.01623665480426|
|     2|208.34406894987526|
|     1|111.11456859971712|
+------+------------------+



### Data Cleaning
    df.drop(subset): drop columns
    df.na.drop(thresh, how, subset): drop rows with na
    df.na.fill('value', subset): better to fill missing values with the mean

In [158]:
df.drop('instant').printSchema()

root
 |-- dteday: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- yr: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- hr: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- cnt: integer (nullable = true)



In [160]:
print(df.count())
# dorp any rows with missing data
print(df.na.drop().count())
# drop a row if a value from a particular row is missing
print(df.na.drop(subset='instant').count())

17379
12069
13411


In [131]:
from pyspark.sql.functions import mean
mean_temp = df.select(mean(df['temp'])).collect()
mean_atemp = df.select(mean(df['atemp'])).collect()
mean_hum = df.select(mean(df['hum'])).collect()
mean_windspeed = df.select(mean(df['windspeed'])).collect()

In [134]:
# it's a list
type(mean_temp)
mean_temp[0][0]
# get the value
mean_temp_value = mean_temp[0][0]

0.49789324012086394

In [188]:
# do it again
from pyspark.sql.functions import mean
mean_temp = df.select(mean(df.temp)).collect()[0][0]
mean_atemp = df.select(mean(df.atemp)).collect()[0][0]
mean_hum = df.select(mean(df.hum)).collect()[0][0]
mean_windspeed = df.select(mean(df.windspeed)).collect()[0][0]

In [189]:
df.na.fill(mean_temp, subset='temp').describe('temp').show()

+-------+-------------------+
|summary|               temp|
+-------+-------------------+
|  count|              17379|
|   mean|0.49789324012086117|
| stddev|0.18920363048478592|
|    min|               0.02|
|    max|                1.0|
+-------+-------------------+



In [162]:
# we need fill 4 fields, put a dict as value
mean_filled = {'temp': mean_temp, 'atemp': mean_atemp,
               'hum': mean_hum, 'windspeed': mean_windspeed}
df.na.fill(mean_filled).describe(['temp', 'atemp', 'hum', 'windspeed']).show()

+-------+-------------------+-------------------+-------------------+-------------------+
|summary|               temp|              atemp|                hum|          windspeed|
+-------+-------------------+-------------------+-------------------+-------------------+
|  count|              17379|              17379|              17379|              17379|
|   mean|0.49789324012086117| 0.4771872830981634|   0.62338454152664| 0.1900976063064631|
| stddev|0.18920363048478592|0.16863523924975896|0.18921838489870607|0.12234022857279034|
|    min|               0.02|                0.0|                0.0|                0.0|
|    max|                1.0|                1.0|                1.0|             0.8507|
+-------+-------------------+-------------------+-------------------+-------------------+

