In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [3]:
df = spark.read.json('people.json')

In [4]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [6]:
df.columns

['age', 'name']

In [6]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [7]:
from pyspark.sql.types import StructField,IntegerType,StringType,StructType

In [8]:
data_schema = [StructField('age',IntegerType(),True),
               StructField('name',StringType(),True)]

In [9]:
final_schema = StructType(fields=data_schema)

In [10]:
df = spark.read.json('people.json',schema=final_schema)

In [11]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [13]:
df.createOrReplaceTempView('people')

In [20]:
results = spark.sql("SELECT * FROM people WHERE age > 10")

In [21]:
results.show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



In [44]:
dfop = spark.read.csv('appl_stock.csv',inferSchema=True,header=True)

In [23]:
dfop.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [25]:
dfop.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [29]:
#dfop.filter("Close < 500").select(['Open','Close']).show()

In [33]:
#dfop.filter(dfop['Close'] < 500).select('Volume').show()

In [35]:
#correct way to use operated is to separate the different compairing with parathesis
#not to use like this 
#dfop.filter(dfop['Close'] > 200 and dfop['Open'] < 200).show() --> this does not work!!
#use       & - and        | - or       ~ - not        
dfop.filter((dfop['Close'] < 200) & (dfop['Open'] > 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [42]:
#if you want to collect results when filtering data we can use collect() method to save the results as list
result = dfop.filter(dfop['Low'] <= 100).collect()

#this collects all the row limits inside an list!!

In [48]:
#we can access each list and can convert them into a dictnory
lastrow = result[-1].asDict()

In [50]:
#using the keys to get the data from dictnory created for last row!!
lastrow['Volume']

56239800

In [3]:
df = spark.read.csv('sales_info.csv',inferSchema=True,header=True)

In [4]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [5]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [6]:
#data object is created and we can use different method off of the object to work with!!!
df.groupBy('Company')

<pyspark.sql.group.GroupedData at 0x7f46367c7588>

In [7]:
df.groupBy('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [8]:
df.groupBy('Company').avg().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [9]:
df.groupBy('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [14]:
#not all method require an groupby call... we can use more generalized way!!
from pyspark.sql.functions import avg,countDistinct,stddev

In [15]:
df.select(avg('Sales')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [17]:
#we need to pass in the function inside select to perform the action of the function
df.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [18]:
#we can use alias to change the name of the final output!!
df.select(stddev('Sales').alias('Alias name std')).show()

+------------------+
|    Alias name std|
+------------------+
|250.08742410799007|
+------------------+



In [21]:
#formatting the data!! format function takes column name and how many decimal place for formatting...
from pyspark.sql.functions import format_number
sales_std = df.select(stddev('Sales').alias('std'))
sales_std.select(format_number('std',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [22]:
#to sort data based on specific column we use orderby!!
#asscending format of ordering
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [23]:
#ordering by descending value
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [24]:
#working with missing values!!
dfn = spark.read.csv('ContainsNull.csv',inferSchema=True,header=True)

In [25]:
dfn.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [26]:
#to drop any row with any amount of missing values!!
dfn.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [27]:
#to drop any row with threshold number of missing values!! if the row has more missing value columns than the threshold 
#it is dropped from the data frame.
dfn.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [28]:
#we can drop bashed on if a row has any null value or all the values of a row are null
#drop(how='any'|'all')
dfn.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [30]:
#we can drop any row if values of subset of column is missing!!
dfn.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [31]:
#how to filling missing data!!
#general way without specifying the column name... 
#if we pass numeric value for fill --> spark fill all the null values of numeric column with this default value
dfn.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [32]:
#if we pass string value for fill --> spark fill all the null values of string column with this default value
dfn.na.fill('DEFAULT NAME').show()

+----+------------+-----+
|  Id|        Name|Sales|
+----+------------+-----+
|emp1|        John| null|
|emp2|DEFAULT NAME| null|
|emp3|DEFAULT NAME|345.0|
|emp4|       Cindy|456.0|
+----+------------+-----+



In [35]:
#using subset to fill the column specific missing values!!
dfn.na.fill('no name',subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|no name| null|
|emp3|no name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [39]:
#filling with mean value!!
from pyspark.sql.functions import mean
mean_val = dfn.select(mean(dfn['Sales'])).collect()

In [40]:
#grabbing value from the list --> row value collected using collect method!!
mean_sales = mean_val[0][0] #selecting the first item in the collect list and selecting the first column value!!
dfn.na.fill(mean_sales,['Sales']).show()


+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [43]:
#ugly way of writting in single line!!
dfn.na.fill(dfn.select(mean(dfn['Sales'])).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [46]:
dfop.select(['Date','Open']).show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [47]:
#importing import time functions!!
from pyspark.sql.functions import dayofmonth,hour,dayofyear,month,year,weekofyear,format_number,date_format

In [49]:
#we pass in the function inside select as we do for rest of the functions!!
dfop.select(dayofmonth(dfop['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [50]:
#average closing price per year!!
dfnew = dfop.withColumn("Year",year(dfop['Date']))

In [55]:
result = dfnew.groupBy('Year').mean().select(['Year','avg(Close)'])

In [56]:
new = result.withColumnRenamed('avg(Close)','Average Closing Price')

In [57]:
new.select(['Year',format_number('Average Closing Price',2).alias('Avg Close')]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2015|   120.04|
|2013|   472.63|
|2014|   295.40|
|2012|   576.05|
|2016|   104.60|
|2010|   259.84|
|2011|   364.00|
+----+---------+

