In [1]:
import findspark

findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('pyspark_tutorial').getOrCreate()

In [7]:
# Random data generation

from pyspark.sql import SQLContext
from pyspark.sql.functions import rand,randn

sqlContext = SQLContext(spark)

# A SQLContext can be used create DataFrame,
# register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.
# sqlContext is generally used in spark ver<2.0

df = sqlContext.range(0,7)
# df.show()

df.select("id",rand(seed=10).alias("uniformDistribution"),randn(seed=27).alias("normalDistribution")).show(3)

+---+-------------------+-------------------+
| id|uniformDistribution| normalDistribution|
+---+-------------------+-------------------+
|  0|0.03422639313807285|0.45800664187768786|
|  1| 0.3654625958161396|0.16420866768809156|
|  2| 0.4175019040792016|-1.0451987154313813|
+---+-------------------+-------------------+
only showing top 3 rows



In [8]:
# Using withColumn to create dataframe

from pyspark.sql.functions import rand
df1 = sqlContext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))
df1.show()

+---+-------------------+-------------------+
| id|              rand1|              rand2|
+---+-------------------+-------------------+
|  0| 0.1709497137955568| 0.8894415403143504|
|  1|0.03422639313807285|  0.808152402708103|
|  2| 0.3654625958161396| 0.5956629641640896|
|  3| 0.4175019040792016| 0.2762195585886885|
|  4| 0.9899129399827472| 0.7043214981152885|
|  5|0.16452185994603707| 0.8378244793454831|
|  6|0.18141810315190554| 0.6340859398860077|
|  7|0.49595620559530806|0.49326717925251506|
|  8| 0.9697474945375325| 0.5711276669097988|
|  9|0.07530606222259384| 0.7049616255180481|
+---+-------------------+-------------------+



In [9]:
from pyspark.sql.functions import when

df3 = spark.range(0,7).withColumn("isTrue",when(rand()>0.5,1).otherwise(0))
df3.show()

+---+------+
| id|isTrue|
+---+------+
|  0|     0|
|  1|     1|
|  2|     1|
|  3|     0|
|  4|     1|
|  5|     1|
|  6|     1|
+---+------+



In [5]:
from pyspark.sql import *

In [6]:
Student = Row("firstname","lastname","age","telephone")

s1 = Student('Walter','White','47',348765)
s2 = Student('Jesse','Pinkman','27',657332)

StudentData = [s1,s2]

In [8]:
df = spark.createDataFrame(StudentData)
df.show()

+---------+--------+---+---------+
|firstname|lastname|age|telephone|
+---------+--------+---+---------+
|   Walter|   White| 47|   348765|
|    Jesse| Pinkman| 27|   657332|
+---------+--------+---+---------+



In [9]:
df.filter(df.age == 27).show()

+---------+--------+---+---------+
|firstname|lastname|age|telephone|
+---------+--------+---+---------+
|    Jesse| Pinkman| 27|   657332|
+---------+--------+---+---------+



In [11]:
data = spark.read.csv("walmart_stock.csv",header=True)
data.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [12]:
data.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [19]:
data.describe('High','Low','Volume').show()

+-------+-----------------+-----------------+-----------------+
|summary|             High|              Low|           Volume|
+-------+-----------------+-----------------+-----------------+
|  count|             1258|             1258|             1258|
|   mean|72.83938807631165| 71.9186009594594|8222093.481717011|
| stddev|6.768186808159218|6.744075756255496|  4519780.8431556|
|    min|        57.060001|        56.299999|         10010500|
|    max|        90.970001|            89.25|          9994400|
+-------+-----------------+-----------------+-----------------+



In [28]:
df = spark.read.json('people.json')

In [29]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [7]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [8]:
df.columns

['age', 'name']

In [35]:
# names = df.select('name').collect()
names[0].name

'Michael'

In [37]:
# converting dataframe to py list

name_list = [row.name for row in names]
name_list

['Michael', 'Andy', 'Justin']

In [9]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [7]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [15]:
from pyspark.sql.types import (StructField,StringType,
                               StructType,IntegerType)

In [11]:
# creating structure of our own datatype
data_schema = [StructField('age',IntegerType(),True),
              StructField('name',StringType(),True)]

In [13]:
final_struct = StructType(fields=data_schema)

In [14]:
df = spark.read.json('people.json',schema=final_struct)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [19]:
# to get the values of a column
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [24]:
# to get first three rows of dataframe
df.head(3)

# df.head(3)[1]

Row(age=30, name='Andy')

In [16]:
# converting list to a dataframe
d = ['Texas','California','New Jersey']

_df = spark.createDataFrame(d,StringType())
_df.show()

+----------+
|     value|
+----------+
|     Texas|
|California|
|New Jersey|
+----------+



In [38]:
# joining list to a dataframe

d = ['Texas','California','New Jersey']

myschema = StructType([
    StructField("name_list",StringType(),True),
    StructField("d",StringType(),True)
])


df_1 = spark.createDataFrame(zip(name_list,d),schema=myschema)
df_1.show()

+---------+----------+
|name_list|         d|
+---------+----------+
|  Michael|     Texas|
|     Andy|California|
|   Justin|New Jersey|
+---------+----------+



In [39]:
df_new = df.join(df_1,df.name==df_1.name_list,how='left').drop(df_1.name_list)
df_new.show()

+----+-------+----------+
| age|   name|         d|
+----+-------+----------+
|null|Michael|     Texas|
|  30|   Andy|California|
|  19| Justin|New Jersey|
+----+-------+----------+



In [15]:
# adding column with constant values to a dataframe
from pyspark.sql.functions import *


df.withColumn('country',lit('USA')).show()

+----+-------+-------+
| age|   name|country|
+----+-------+-------+
|null|Michael|    USA|
|  30|   Andy|    USA|
|  19| Justin|    USA|
+----+-------+-------+



In [18]:
df.createOrReplaceTempView('employee')

In [31]:
result = spark.sql('SELECT age FROM employee')
result.show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [34]:
# converting spark dataframe into pandas df

spark.conf.set("spark.sql.execution.arrow.enabled","true")

pandas_df = df.select("*").toPandas()
pandas_df

  PyArrow >= 0.15.1 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,age,name
0,,Michael
1,30.0,Andy
2,19.0,Justin


In [35]:
data = spark.read.csv('walmart_stock.csv',inferSchema=True,header=True)

In [36]:
data.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [37]:
data.head(2)

[Row(Date='2012-01-03', Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date='2012-01-04', Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)]

In [45]:
# data.filter("Close<60 and Open<60").select(['Open','Close']).show(3)

# or

data.filter( (data['close']<60) & (data['open']<70) ).select(['open','close','Volume']).show(3)

+------------------+------------------+--------+
|              open|             close|  Volume|
+------------------+------------------+--------+
|60.209998999999996|59.709998999999996| 9593300|
|         59.349998|         59.419998|12768200|
|         59.419998|              59.0| 8069400|
+------------------+------------------+--------+
only showing top 3 rows



In [46]:
sales_df = spark.read.csv('sales_info.csv',inferSchema=True,header=True)
sales_df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [48]:
sales_df.show(2)

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+
only showing top 2 rows



In [47]:
sales_df.describe('Sales').show()

+-------+------------------+
|summary|             Sales|
+-------+------------------+
|  count|                12|
|   mean| 360.5833333333333|
| stddev|250.08742410799007|
|    min|             120.0|
|    max|             870.0|
+-------+------------------+



In [49]:
sales_df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [50]:
sales_df.groupBy('Company').max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [51]:
sales_df.agg({'Sales':'Sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [53]:
# sales_df.orderBy('Sales').show()  ##### ascending

sales_df.orderBy(sales_df['Sales'].desc()).show()    #### descending

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [4]:
# Working with null values


df = spark.read.csv('ContainsNull.csv',header=True,inferSchema=True)
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [5]:
df.na.drop(thresh=2).show()     ##### thresh=2 means there must be atleast 2
# not null values in each row

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [6]:
df.na.drop(subset=['Name']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp4|Cindy|456.0|
+----+-----+-----+



In [10]:
from pyspark.sql.functions import mean

mean_sales = df.select(mean(df['Sales'])).collect()
mean_sales = mean_sales[0][0]
                       
df.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [11]:
# working with date and timestamp

apple_df = spark.read.csv('appl_stock.csv',header=True,inferSchema=True)
apple_df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [13]:
apple_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [15]:
# converting str type to date
# from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import functions as F

apple_df = apple_df.withColumn('new_date',F.to_date(F.unix_timestamp('Date',
'yyyy-MM-dd').cast('timestamp')))
apple_df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+----------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|  new_date|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010-01-04|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010-01-05|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010-01-06|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010-01-07|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700

In [16]:
apple_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- new_date: date (nullable = true)



In [17]:
from pyspark.sql.functions import month,dayofmonth,year

apple_df.withColumn('Year',year(apple_df['new_date'])).select(['new_date','Year']).show(3)

+----------+----+
|  new_date|Year|
+----------+----+
|2010-01-04|2010|
|2010-01-05|2010|
|2010-01-06|2010|
+----------+----+
only showing top 3 rows



In [40]:
spark.stop()