### Handling BigData using PySparks

Grab a PARQUET file and create a dataframe out of it. Using SparkSQL we can handle it like a database. 

In [1]:
import pyspark.sql.functions

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20191216092926-0000
KERNEL_ID = 7a51d5b5-3539-453f-af37-8488d9c75a91


In [None]:
# google images: parquet file
# goole images - spark rdd

# wget - download command
# mv - just rename the file name that was downloaded

In [2]:
!wget https://github.com/rahulsnair/sample_parquet/blob/master/washing.parquet?raw=true
!mv washing.parquet?raw=true washing.parquet

--2019-12-16 09:29:32--  https://github.com/rahulsnair/sample_parquet/blob/master/washing.parquet?raw=true
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/rahulsnair/sample_parquet/raw/master/washing.parquet [following]
--2019-12-16 09:29:32--  https://github.com/rahulsnair/sample_parquet/raw/master/washing.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/rahulsnair/sample_parquet/master/washing.parquet [following]
--2019-12-16 09:29:32--  https://raw.githubusercontent.com/rahulsnair/sample_parquet/master/washing.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response..

In [3]:
df = spark.read.parquet('washing.parquet')

#here its for creating and object name "washing" we could query with sql later on
df.createOrReplaceTempView('washing')

###########################
#ask students to show with different row numbers
df.show()
#df.show(5)

+--------------------+--------------------+-----+--------+----------+---------+--------+-----+-----------+-------------+-------+
|                 _id|                _rev|count|flowrate|fluidlevel|frequency|hardness|speed|temperature|           ts|voltage|
+--------------------+--------------------+-----+--------+----------+---------+--------+-----+-----------+-------------+-------+
|0d86485d0f88d1f9d...|1-57940679fb8a713...|    4|      11|acceptable|     null|      77| null|        100|1547808723923|   null|
|0d86485d0f88d1f9d...|1-15ff3a0b304d789...|    2|    null|      null|     null|    null| 1046|       null|1547808729917|   null|
|0d86485d0f88d1f9d...|1-97c2742b68c7b07...|    4|    null|      null|       71|    null| null|       null|1547808731918|    236|
|0d86485d0f88d1f9d...|1-eefb903dbe45746...|   19|      11|acceptable|     null|      75| null|         86|1547808738999|   null|
|0d86485d0f88d1f9d...|1-5f68b4c72813c25...|    7|    null|      null|       75|    null| null|   

In [4]:
# function that return number of rows in the parquet file
def count(df,spark):
    return spark.sql('SELECT COUNT(*) as cnt FROM washing').first().cnt

In [5]:
# function that return number of columns in the parquet file
def getNumberOfFields(df,spark):
    return len(df.columns)

In [6]:
# function that return the list of column names in the parquet file
def getFieldNames(df,spark):
    return df.columns

Test count function

In [7]:
cnt = None
nof = None
fn = None

cnt = count(df,spark)
print(cnt)

2058


Test getNumberOfFields function

In [8]:
nof = getNumberOfFields(df,spark)
print(nof)

11


Test getFieldNames function

In [9]:
fn = getFieldNames(df,spark)
print(fn)

['_id', '_rev', 'count', 'flowrate', 'fluidlevel', 'frequency', 'hardness', 'speed', 'temperature', 'ts', 'voltage']


In [3]:
##################
# try
# df.shape
# type(df)
# df.columns
# df.first()

In [None]:
###############
# show first 7 temperatures

spark.sql('SELECT temperature as ttt FROM washing').show(7)
# OR
df.select('temperature').show(7)

In [None]:
# try the commands:
df.dtypes
df.describe().show()
df.count()
df.distinct().count()  #how many distinct rows
df.select('temperature').distinct().count()  # = nunique
a = df.select('temperature').toPandas()
type(a)
a = df.select(['temperature', 'speed']).toPandas()

In [None]:
######################
# groupby speed to count unique values. (this is the value counts)
df.groupBy('speed').count().show()
#############################
# sort it
df.groupBy('speed').count().orderBy('count', ascending=False).show()

In [None]:
##########################
# search in google how to find how many nans in each column

from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
######################################
# handling missing values:
# type df.na. and  autocomplete to implement fill, drop - note!!! not inplace, so do df= df.na.xxx
df3 = df.na.fill(888)
df3.show()


In [None]:
# show speeds > 1000
# look in google/doc how to use "filter"
df.filter(df['speed'] > 1000).show()