<a href="https://colab.research.google.com/github/JarekMaleszyk/data-science-project-sandbox/blob/main/apache_spark_with_python_basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [194]:
try:
  import pyspark
except:
  !pip install pyspark
  import pyspark
finally:
  from pyspark.sql import SparkSession

1. Simple json file

In [195]:
import os.path
FILE_PATH = "/content/jsondata/data.json"

if not os.path.isfile(FILE_PATH):
  !rm -rf /content/jsondata
  !wget -P "/content/jsondata/" "https://raw.githubusercontent.com/JarekMaleszyk/data-science-project-sandbox/refs/heads/main/data.json"

In [196]:
sparkSession = SparkSession.builder.appName('Pratice with pyspark basics').getOrCreate()

In [197]:
from pyspark.sql.types import (StructType, StructField, StringType, IntegerType,
                               ArrayType, DoubleType, BooleanType)

schema = StructType(fields=[
    StructField("id", IntegerType(), True), # True = nullable
    StructField("name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True)
])

In [198]:
pyspark_df_products = sparkSession.read\
  .schema(schema)\
  .option("multiline", True)\
  .json(FILE_PATH)

In [199]:
pyspark_df_products.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)



In [200]:
pyspark_df_products.show()

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
| 10| Harry Potter Series|          Books|      20| 15.99|
+---+--------------------+---------------+--------+------+



2. More complex json file

In [201]:
USER_FILE_PATH = "/content/jsondata/users.json"

if not os.path.isfile(USER_FILE_PATH):
  !rm -f /content/jsondata/users.json
  !wget -P "/content/jsondata/" "https://jsonplaceholder.typicode.com/users"
  !mv /content/jsondata/users /content/jsondata/users.json

In [202]:
pyspark_df_users = sparkSession.read\
  .option("inferSchema", True)\
  .option("multiline", True)\
  .json(USER_FILE_PATH)

In [203]:
pyspark_df_users.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- geo: struct (nullable = true)
 |    |    |-- lat: string (nullable = true)
 |    |    |-- lng: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- suite: string (nullable = true)
 |    |-- zipcode: string (nullable = true)
 |-- company: struct (nullable = true)
 |    |-- bs: string (nullable = true)
 |    |-- catchPhrase: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- username: string (nullable = true)
 |-- website: string (nullable = true)



In [204]:
pyspark_df_users.show()

+--------------------+--------------------+--------------------+---+--------------------+--------------------+----------------+-------------+
|             address|             company|               email| id|                name|               phone|        username|      website|
+--------------------+--------------------+--------------------+---+--------------------+--------------------+----------------+-------------+
|{Gwenborough, {-3...|{harness real-tim...|   Sincere@april.biz|  1|       Leanne Graham|1-770-736-8031 x5...|            Bret|hildegard.org|
|{Wisokyburgh, {-4...|{synergize scalab...|   Shanna@melissa.tv|  2|        Ervin Howell| 010-692-6593 x09125|       Antonette|anastasia.net|
|{McKenziehaven, {...|{e-enable strateg...|  Nathan@yesenia.net|  3|    Clementine Bauch|      1-463-123-4447|        Samantha|  ramiro.info|
|{South Elvis, {29...|{transition cutti...|Julianne.OConner@...|  4|    Patricia Lebsack|   493-170-9623 x156|        Karianne|     kale.biz|
|{Rosc

In [205]:
print(pyspark_df_users.columns)
print(pyspark_df_users.describe().show())

['address', 'company', 'email', 'id', 'name', 'phone', 'username', 'website']
+-------+--------------------+------------------+----------------+-----------------+---------+-----------+
|summary|               email|                id|            name|            phone| username|    website|
+-------+--------------------+------------------+----------------+-----------------+---------+-----------+
|  count|                  10|                10|              10|               10|       10|         10|
|   mean|                NULL|               5.5|            NULL|             NULL|     NULL|       NULL|
| stddev|                NULL|3.0276503540974917|            NULL|             NULL|     NULL|       NULL|
|    min|Chaim_McDermott@d...|                 1|Chelsey Dietrich|    (254)954-1289|Antonette|ambrose.net|
|    max|Telly.Hoeger@bill...|                10|Patricia Lebsack|586.493.6943 x140| Samantha|ramiro.info|
+-------+--------------------+------------------+----------------+

3. Managing with data

In [206]:
pyspark_df_users.select(['address', 'id', 'name']).show()

+--------------------+---+--------------------+
|             address| id|                name|
+--------------------+---+--------------------+
|{Gwenborough, {-3...|  1|       Leanne Graham|
|{Wisokyburgh, {-4...|  2|        Ervin Howell|
|{McKenziehaven, {...|  3|    Clementine Bauch|
|{South Elvis, {29...|  4|    Patricia Lebsack|
|{Roscoeview, {-31...|  5|    Chelsey Dietrich|
|{South Christy, {...|  6|Mrs. Dennis Schulist|
|{Howemouth, {24.8...|  7|     Kurtis Weissnat|
|{Aliyaview, {-14....|  8|Nicholas Runolfsd...|
|{Bartholomebury, ...|  9|     Glenna Reichert|
|{Lebsackbury, {-3...| 10|  Clementina DuBuque|
+--------------------+---+--------------------+



In [207]:
pyspark_df_users.head(2)[0] # <= Row object

Row(address=Row(city='Gwenborough', geo=Row(lat='-37.3159', lng='81.1496'), street='Kulas Light', suite='Apt. 556', zipcode='92998-3874'), company=Row(bs='harness real-time e-markets', catchPhrase='Multi-layered client-server neural-net', name='Romaguera-Crona'), email='Sincere@april.biz', id=1, name='Leanne Graham', phone='1-770-736-8031 x56442', username='Bret', website='hildegard.org')

In [208]:
pyspark_df_users.withColumn("phone_number", pyspark_df_users['phone']).show() # new column

+--------------------+--------------------+--------------------+---+--------------------+--------------------+----------------+-------------+--------------------+
|             address|             company|               email| id|                name|               phone|        username|      website|        phone_number|
+--------------------+--------------------+--------------------+---+--------------------+--------------------+----------------+-------------+--------------------+
|{Gwenborough, {-3...|{harness real-tim...|   Sincere@april.biz|  1|       Leanne Graham|1-770-736-8031 x5...|            Bret|hildegard.org|1-770-736-8031 x5...|
|{Wisokyburgh, {-4...|{synergize scalab...|   Shanna@melissa.tv|  2|        Ervin Howell| 010-692-6593 x09125|       Antonette|anastasia.net| 010-692-6593 x09125|
|{McKenziehaven, {...|{e-enable strateg...|  Nathan@yesenia.net|  3|    Clementine Bauch|      1-463-123-4447|        Samantha|  ramiro.info|      1-463-123-4447|
|{South Elvis, {29...|

In [209]:
pyspark_df_users.withColumnRenamed("phone", "phone_number").show() # rename of column

+--------------------+--------------------+--------------------+---+--------------------+--------------------+----------------+-------------+
|             address|             company|               email| id|                name|        phone_number|        username|      website|
+--------------------+--------------------+--------------------+---+--------------------+--------------------+----------------+-------------+
|{Gwenborough, {-3...|{harness real-tim...|   Sincere@april.biz|  1|       Leanne Graham|1-770-736-8031 x5...|            Bret|hildegard.org|
|{Wisokyburgh, {-4...|{synergize scalab...|   Shanna@melissa.tv|  2|        Ervin Howell| 010-692-6593 x09125|       Antonette|anastasia.net|
|{McKenziehaven, {...|{e-enable strateg...|  Nathan@yesenia.net|  3|    Clementine Bauch|      1-463-123-4447|        Samantha|  ramiro.info|
|{South Elvis, {29...|{transition cutti...|Julianne.OConner@...|  4|    Patricia Lebsack|   493-170-9623 x156|        Karianne|     kale.biz|
|{Rosc

In [210]:
pyspark_df_users.createOrReplaceTempView("df_users")  # create view
results = sparkSession.sql( """
                              SELECT
                                 address.city
                                ,address.street
                                ,email
                                ,phone
                                ,website
                                ,id
                              FROM df_users
                              WHERE id = 1
                            """)                      # use SQL to query that view

In [211]:
results.show()

+-----------+-----------+-----------------+--------------------+-------------+---+
|       city|     street|            email|               phone|      website| id|
+-----------+-----------+-----------------+--------------------+-------------+---+
|Gwenborough|Kulas Light|Sincere@april.biz|1-770-736-8031 x5...|hildegard.org|  1|
+-----------+-----------+-----------------+--------------------+-------------+---+



In [212]:
pyspark_df_users.filter(pyspark_df_users['id'] == 1).select(['address.city', 'address.street', 'email', 'phone', 'website', 'id']).show() # same without SQL

+-----------+-----------+-----------------+--------------------+-------------+---+
|       city|     street|            email|               phone|      website| id|
+-----------+-----------+-----------------+--------------------+-------------+---+
|Gwenborough|Kulas Light|Sincere@april.biz|1-770-736-8031 x5...|hildegard.org|  1|
+-----------+-----------+-----------------+--------------------+-------------+---+



In [213]:
pyspark_df_products.filter((pyspark_df_products['price'] > 20.99) &
                           (pyspark_df_products['quantity'] >= 20)).show()

+---+---------------+--------+--------+------+
| id|           name|category|quantity| price|
+---+---------------+--------+--------+------+
|  2|Nike Air Max 90|Clothing|      25|119.99|
|  6|       Yoga Mat|  Sports|      30| 29.99|
+---+---------------+--------+--------+------+



In [214]:
pyspark_df_products.filter((pyspark_df_products['price'] > 20.99) |
                           ~(pyspark_df_products['quantity'] >= 20)).show()

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
+---+--------------------+---------------+--------+------+



In [215]:
result = pyspark_df_products.groupBy('category').mean().collect() # wciąga nawet kolumnę id

In [216]:
row = result[0]
row_dict = row.asDict()
print(row_dict)

{'category': 'Sports', 'avg(id)': 6.0, 'avg(quantity)': 30.0, 'avg(price)': 29.99}


In [217]:
row.asDict()['avg(quantity)']

30.0

In [220]:
pyspark_df_products.groupBy('category').mean('quantity').show() #

+---------------+-------------+
|       category|avg(quantity)|
+---------------+-------------+
|         Sports|         30.0|
|    Electronics|          9.0|
|       Clothing|         20.0|
|          Books|         35.0|
|Home Appliances|          4.0|
|         Beauty|        100.0|
+---------------+-------------+



In [220]:
sum = pyspark_df_products.groupBy('category').sum('quantity').collect()
min = pyspark_df_products.groupBy('category').min('quantity').collect()
max = pyspark_df_products.groupBy('category').max('quantity').collect()
cnt = pyspark_df_products.groupBy('category').count().collect()

In [226]:
from pyspark.sql.functions import countDistinct, avg, stddev
unique_cnt = pyspark_df_products.groupBy('category').agg(countDistinct('quantity')).collect()

In [229]:
pyspark_df_products.agg({'quantity': 'max'}).show()

+-------------+
|max(quantity)|
+-------------+
|          100|
+-------------+



In [231]:
pyspark_df_products.select(avg('quantity').alias('avg_quantity')).show()

+------------+
|avg_quantity|
+------------+
|        26.6|
+------------+



In [239]:
from pyspark.sql.functions import format_number

pyspark_df_products.select(stddev('quantity').alias(col_name := 'stddev_quantity')).select(format_number(col_name, 2).alias(col_name)).show()

+---------------+
|stddev_quantity|
+---------------+
|          29.39|
+---------------+



In [244]:
pyspark_df_products.orderBy(pyspark_df_products['quantity'].desc()).show()

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
| 10| Harry Potter Series|          Books|      20| 15.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  1|           iPhone 12|    Electronics|      10|899.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
+---+--------------------+---------------+--------+------+



In [249]:
pyspark_df_products.orderBy(pyspark_df_products['category'].desc(), pyspark_df_products['price'].asc()).show()

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  1|           iPhone 12|    Electronics|      10|899.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
| 10| Harry Potter Series|          Books|      20| 15.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+



In [220]:
pyspark_df_products.na.drop(thresh=3).show() #thresh=3 means at least 3 NON NULL values in row
pyspark_df_products.na.drop(how='any').show() #how = any/all null value in row
pyspark_df_products.na.drop(subset=['quantity']).show() #nulls in selected column

In [250]:
pyspark_df_products.na.fill(0, ['quantity', 'price']).show()

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
| 10| Harry Potter Series|          Books|      20| 15.99|
+---+--------------------+---------------+--------+------+



In [251]:
from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format

In [250]:
df.select(dayofmonth(df['Date'])).show()
df.select(hour(df['Date'])).show()
df.select(dayofyear(df['Date'])).show()
df.select(month(df['Date'])).show()
df.withColumn("Year",year(df['Date'])).show()

newdf = df.withColumn("Year",year(df['Date']))
newdf.groupBy("Year").mean()[['avg(Year)','avg(Close)']].show()

result = newdf.groupBy("Year").mean()[['avg(Year)','avg(Close)']]
result = result.withColumnRenamed("avg(Year)","Year")
result = result.select('Year',format_number('avg(Close)',2).alias("Mean Close")).show()