# Pyspark

In [1]:
from pyspark.sql import SparkSession

In [5]:
spark=SparkSession.builder.appName('Basic').getOrCreate()

In [6]:
df=spark.read.json('data/people.json')

In [7]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [8]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [10]:
df.summary()

DataFrame[summary: string, age: string, name: string]

# Data type

In [11]:
from pyspark.sql.types import (StructField,StructType,IntegerType,StringType)

In [13]:
data_schema=[StructField('age',IntegerType(),False),
             StructField('name',StringType(),False)
]

she=StructType(fields=data_schema)

In [15]:
df=spark.read.json('data/people.json',schema=she)

In [18]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [28]:
df['age']

Column<b'age'>

In [22]:
df.select(['age','name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [35]:
df.withColumn('he',df['age']*20).show()

+----+-------+----+
| age|   name|  he|
+----+-------+----+
|null|Michael|null|
|  30|   Andy| 600|
|  19| Justin| 380|
+----+-------+----+



In [36]:
df.createTempView('peo')

In [41]:
sql=spark.sql('select * from peo').show()


+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



# Filter 

In [62]:
dff=spark.read.csv('data/sales_info.csv',inferSchema=True,header=True)

In [63]:
dff

DataFrame[Company: string, Person: string, Sales: double]

In [64]:
dff.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [61]:
dff.filter(dff['_c2']<500).show()

+----+-------+---+
| _c0|    _c1|_c2|
+----+-------+---+
|GOOG|    Sam|200|
|GOOG|Charlie|120|
|GOOG|  Frank|340|
|MSFT|    Amy|124|
|MSFT|Vanessa|243|
|  FB|  Sarah|350|
|APPL|   John|250|
+----+-------+---+



In [66]:
dff.groupby(['company']).mean().show()

+-------+-----------------+
|company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [68]:
dff.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [108]:
dff.agg({'Sales':'std'}).collect()

[Row(stddev(Sales)=250.08742410799007)]

# Empty number

In [75]:
dfff=spark.read.csv('data/ContainsNull.csv',inferSchema=True,header=True)

In [77]:
dfff.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [82]:
d=dfff.na.drop()

In [83]:
d.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [111]:
from pyspark.sql.functions import mean

d=dfff.select(mean(dfff['Sales'])).collect()

In [115]:
dfff.fillna(d[0][0],subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Date and Time

In [120]:
dfz=spark.read.csv('data/appl_stock.csv',inferSchema=True,header=True)


In [121]:
dfz.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [128]:
dfz.head()

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [201]:
from pyspark.sql.functions import (dayofmonth,dayofweek,dayofyear,year,month,weekofyear,format_number)

In [136]:
dfz.select(dayofyear(dfz['Date'])).show()

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
|             11|
|             12|
|             13|
|             14|
|             15|
|             19|
|             20|
|             21|
|             22|
|             25|
|             26|
|             27|
|             28|
|             29|
|             32|
+---------------+
only showing top 20 rows



In [164]:
yea=dfz.select(year(dfz['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [178]:
df_=dfz.withColumn('year',year(dfz['Date']))
df_.show()


+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|year|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.0600050000000

In [202]:
df_f=df_.groupBy('year').mean().select(['year',format_number('avg(Close)',2)])

In [203]:
df_f=df_f.withColumnRenamed('avg(Close)','avg_close')


In [204]:
df_f.show()

+----+----------------------------+
|year|format_number(avg(Close), 2)|
+----+----------------------------+
|2015|                      120.04|
|2013|                      472.63|
|2014|                      295.40|
|2012|                      576.05|
|2016|                      104.60|
|2010|                      259.84|
|2011|                      364.00|
+----+----------------------------+

