In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [3]:
df = spark.read.json('people.json')

In [4]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [6]:
df.columns

['age', 'name']

In [7]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [8]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



# show columns types

In [9]:
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType)

In [10]:
# convert age column to int from long & name to string
data_schema = [StructField('age', IntegerType(), True),
                StructField('name', StringType(), True)]

In [11]:
final_struc = StructType(fields=data_schema)

In [12]:
df = spark.read.json('people.json', schema=final_struc)

In [13]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [14]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



# Part 2

In [15]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [16]:
df.head(2)[0]

Row(age=None, name='Michael')

In [17]:
type(df['Age'])

pyspark.sql.column.Column

In [18]:
df.select('Age')

DataFrame[Age: int]

In [19]:
df.select('Age').show()

+----+
| Age|
+----+
|null|
|  30|
|  19|
+----+



In [20]:
df.select(['Age', 'name']).show()

+----+-------+
| Age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



# Add a new column

In [21]:
df.withColumn('newage', df['age']).show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



In [22]:
df.withColumn('double_age', df['age'] * 2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



# Rename column

In [23]:
df.withColumnRenamed('age', 'my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [25]:
# multiple rename
df.withColumnRenamed('age', 'ages').withColumnRenamed('name', 'Names').show()

+----+-------+
|ages|  Names|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



# use sequel queries

# create a temp table view called 'people' from table 'df'

In [26]:
df.createOrReplaceTempView('people')

In [30]:
results = spark.sql("SELECT * FROM people")

results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [29]:
new_results = spark.sql("SELECT * FROM people WHERE age = 30")

new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



# Spark Dataframe Basic Operations

In [31]:
from pyspark.sql import SparkSession

In [32]:
spark = SparkSession.builder.appName('DF_Basics').getOrCreate()

In [33]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [34]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [35]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [36]:
df.head(3)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004)]

In [37]:
df.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [38]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

# SQL Syntax

In [40]:
df.filter("Close < 500").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [41]:
df.filter("Close < 500").select('Date', 'Close', 'Low', 'High').show()

+-------------------+------------------+------------------+------------------+
|               Date|             Close|               Low|              High|
+-------------------+------------------+------------------+------------------+
|2010-01-04 00:00:00|        214.009998|212.38000099999996|        214.499996|
|2010-01-05 00:00:00|        214.379993|        213.249994|        215.589994|
|2010-01-06 00:00:00|        210.969995|        210.750004|            215.23|
|2010-01-07 00:00:00|            210.58|        209.050005|        212.000006|
|2010-01-08 00:00:00|211.98000499999998|209.06000500000002|        212.000006|
|2010-01-11 00:00:00|210.11000299999998|        208.450005|        213.000002|
|2010-01-12 00:00:00|        207.720001|        206.419998|209.76999500000002|
|2010-01-13 00:00:00|        210.650002|        204.099998|210.92999500000002|
|2010-01-14 00:00:00|            209.43|        209.020004|210.45999700000002|
|2010-01-15 00:00:00|            205.93|        205.

# PySpark (Python) Syntax

In [42]:
df.filter(df['Close'] < 500).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [43]:
df.filter(df['Close'] < 500).select('Low', 'High').show()

+------------------+------------------+
|               Low|              High|
+------------------+------------------+
|212.38000099999996|        214.499996|
|        213.249994|        215.589994|
|        210.750004|            215.23|
|        209.050005|        212.000006|
|209.06000500000002|        212.000006|
|        208.450005|        213.000002|
|        206.419998|209.76999500000002|
|        204.099998|210.92999500000002|
|        209.020004|210.45999700000002|
|        205.869999|211.59999700000003|
|        207.240004|215.18999900000003|
|        209.500002|        215.549994|
|        207.210003|213.30999599999998|
|            197.16|        207.499996|
|        200.190002|        204.699999|
|        202.580004|        213.710005|
|        199.530001|            210.58|
|        198.699995|        205.500004|
|        190.250002|        202.199995|
|191.29999899999999|             196.0|
+------------------+------------------+
only showing top 20 rows



In [44]:
# Multiple opeartion should be &, |, ~ .... for and, or, not
df.filter( (df['Close'] < 200) & (df['Open'] > 200) ).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [45]:
df.filter( (df['Close'] < 200) & ~ (df['Open'] > 200) ).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01 00:00:00|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03 00:00:00|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04 00:00:00|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05 00:00:00|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.3

In [47]:
df.filter( (df['Close'] < 200) & ~ (df['Open'] > 200) ).select('Low', 'High').show()

+------------------+------------------+
|               Low|              High|
+------------------+------------------+
|191.29999899999999|             196.0|
|193.37999299999998|        196.319994|
|        194.420004|        200.200003|
|        191.570005|        198.370001|
|        190.850002|             196.0|
|        193.999994|197.88000300000002|
|        194.749998|        197.499994|
|            194.26|             196.6|
|194.05999599999998|        199.750006|
|        195.709993|        201.330002|
|             91.75|         93.879997|
|             93.57|         95.050003|
|         93.470001|         94.760002|
|         91.900002|         94.120003|
|         90.879997|         92.440002|
|         91.449997|             92.75|
|         91.800003|         92.699997|
|         91.349998|         92.290001|
|         91.339996|         92.300003|
|         90.900002|         92.550003|
+------------------+------------------+
only showing top 20 rows



In [48]:
df.filter(df['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [49]:
result = df.filter(df['Low'] == 197.16).collect()

result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [50]:
result[0]

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [51]:
row = result[0]

row

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [52]:
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [53]:
type(row)

pyspark.sql.types.Row

In [54]:
new_row = row.asDict()

new_row

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [55]:
type(new_row)

dict

In [56]:
new_row['Volume']

220441900