In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Jerry_Test").getOrCreate()

In [6]:
df = spark.read.json("/Users/jerry/spark-2.4.6-bin-hadoop2.7/python/pyspark_learning/datasets/people.json")

In [7]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [8]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [9]:
df.columns

['age', 'name']

In [11]:
# Cannot correctly run when the Java version is to high
# So I will skip this
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



### We can specify the type when importing data

In [12]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType

In [13]:
data_schema = [StructField('age',IntegerType(),True),
               StructField('name',StringType(),True)]

In [14]:
final_struct = StructType(fields=data_schema)

In [15]:
struct_df = spark.read.json('/Users/jerry/spark-2.4.6-bin-hadoop2.7/python/pyspark_learning/datasets/people.json',
                            schema=final_struct)

In [16]:
struct_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [19]:
type(df['age'])

pyspark.sql.column.Column

In [21]:
df.select("age").show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [24]:
df.head(n=2)[1]

Row(age=30, name='Andy')

In [26]:
# Select multi columns
df.select(['age','name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [30]:
# Add new column
df.withColumn(colName='dup_age',col=df['age']).show()

# can also do some operations when adding columns
df.withColumn(colName='double_age',col=df['age']*2).show()


+----+-------+-------+
| age|   name|dup_age|
+----+-------+-------+
|null|Michael|   null|
|  30|   Andy|     30|
|  19| Justin|     19|
+----+-------+-------+

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [31]:
# Rename
df.withColumnRenamed(existing='age',new='new_age').show()

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



In [32]:
# Spark SQL
df.createOrReplaceTempView('people')

In [36]:
results  = spark.sql("SELECT * FROM people WHERE name LIKE '%nd%' ")

In [37]:
results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



### Spark basic Datafram operations

In [39]:
stock_df = spark.read.csv('/Users/jerry/spark-2.4.6-bin-hadoop2.7/python/pyspark_learning/datasets/appl_stock.csv',
                          inferSchema=True,header=True)

In [40]:
stock_df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [44]:
stock_df.head(2)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002)]

In [47]:
# The SQL way to filter data
stock_df.filter("Close<1").show()

+----+----+----+---+-----+------+---------+
|Date|Open|High|Low|Close|Volume|Adj Close|
+----+----+----+---+-----+------+---------+
+----+----+----+---+-----+------+---------+



In [50]:
# Select several columns by using select
stock_df.filter("Close<1").select(['Open','Low']).show()

+----+---+
|Open|Low|
+----+---+
+----+---+



In [55]:
# The dataframe python way
stock_df.filter(stock_df['Close'] < 92).show()

+-------------------+---------+-----------------+---------+-----------------+---------+-----------------+
|               Date|     Open|             High|      Low|            Close|   Volume|        Adj Close|
+-------------------+---------+-----------------+---------+-----------------+---------+-----------------+
|2014-06-13 00:00:00|92.199997|        92.440002|90.879997|        91.279999| 54525000|        86.610132|
|2014-06-19 00:00:00|92.290001|        92.300003|91.339996|        91.860001| 35528000|        87.160461|
|2014-06-20 00:00:00|91.849998|        92.550003|90.900002|        90.910004|100898000|        86.259066|
|2014-06-23 00:00:00|    91.32|        91.620003|90.599998|90.83000200000001| 43694000|        86.183157|
|2014-06-24 00:00:00|    90.75|        91.739998|90.190002|        90.279999| 39036000|        85.661292|
|2014-06-25 00:00:00|90.209999|        90.699997|89.650002|        90.360001| 36869000|        85.737201|
|2014-06-26 00:00:00|90.370003|        91.0500

In [56]:
# what if we have multiple conditions/filters?

# Below will raise an error
# stock_df.filter(stock_df['Close'] < 92 and stock_df['Open'] > 200).show()

# this is the correct way, need to use & to replace AND, | to replace OR
# and remember to seperate them
stock_df.filter( (stock_df['Close'] < 92) & (stock_df['Open'] > 200) ).show()

# ~ refers to the NOT operator, e.g. ( ~stock_df['Open'] > 200 ) means not large than 200

+----+----+----+---+-----+------+---------+
|Date|Open|High|Low|Close|Volume|Adj Close|
+----+----+----+---+-----+------+---------+
+----+----+----+---+-----+------+---------+



In [59]:
# Use collect() function to manipulte the data later
# this will return the ROW object
stock_df.filter(stock_df['Low'] == 197.16).collect()

collect_results = stock_df.filter(stock_df['Low'] == 197.16).collect()


In [60]:
row = collect_results[0]

In [61]:
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [62]:
row.asDict()['High']

207.499996

### Spark Dataframes GroupBy and Aggreagate

In [63]:
# Checking pyspark.sql module documentation for more details
df = spark.read.csv('/Users/jerry/spark-2.4.6-bin-hadoop2.7/python/pyspark_learning/datasets/sales_info.csv',
                     inferSchema=True,header=True)

In [65]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [66]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [67]:
# GroupBy
# Not all methods need a groupBy call
df.groupBy("Company")

<pyspark.sql.group.GroupedData at 0x7f9ac438af10>

In [69]:
df.groupBy("Company").max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [71]:
# This will calculate the total numbers
# agg function needs a dict input
df.agg({'Sales':'sum'}).show()
df.agg({'Sales':'max'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [72]:
group_data = df.groupBy("Company")

In [74]:
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [75]:
# Pyspark functions
from pyspark.sql.functions import countDistinct,avg,stddev

In [76]:
df.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [78]:
df.select(avg('Sales').alias('something else')).show()

+-----------------+
|   something else|
+-----------------+
|360.5833333333333|
+-----------------+



In [80]:
df.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [79]:
from pyspark.sql.functions import format_number

In [83]:
sales_std = df.select(stddev('Sales').alias('stddev'))

In [86]:
# This format_number function can control how many decimal numbers you want to see
sales_std.select(format_number('stddev',3).alias('stddev_for_sales')).show()

+----------------+
|stddev_for_sales|
+----------------+
|         250.087|
+----------------+



In [87]:
# Sort
# The default is ASC
df.orderBy("Sales").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [88]:
# If you want to do DESC
# need to pass the actual columns here
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+

