# PySpark with Cassandra

## PySpark is the Python API for Apache Spark, a powerful framework for large-scale data processing

In [1]:
from pyspark.sql import SparkSession # This imports the SparkSession class from the PySpark library.

# Lets Initialize a new SparkSession, which is the entry point to programming Spark with the Dataset and DataFrame API.
# .builder: This starts the process of creating a SparkSession. 
# .appName("Customer Dataset Analysis"): Sets the name of the Spark application as “Customer Dataset Analysis.” This name appears in logs, the Spark UI, and other places. 
# .getOrCreate(): Either returns an existing SparkSession if one already exists in the current context, or creates a new one.

spark = SparkSession.builder \
    .appName("ReadFromCassandra") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.4.0") \
    .config("spark.cassandra.connection.host", "127.0.0.1") \
    .config("spark.cassandra.connection.port", "9042") \
    .getOrCreate()

In [2]:
# confirms that the spark instance is configured and running
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000013A92181280>


In [3]:
# Spark DataFrame called df, which holds the data for processing

df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="customers", keyspace="pyspark_keyspace") \
    .load()

In [4]:
df.show()

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|0e07a711-9338-450...|6238 Elm St, Los ...|   914312.12|      Jane|    Jones|                 623.35| 3015.4|
|8489da6b-9076-4e4...|39 Main St, Los A...|    333407.9|     Alice|      Doe|                  137.1|4818.09|
|685b993f-571d-411...|3858 Pine Ave, Ph...|    901010.6|   Charlie|    Smith|                 285.31|4007.45|
|ad347a5c-6c89-4e1...|8983 Oak St, Houston|    674981.9|     Emily|  Johnson|                 746.17|4457.55|
|da0c930e-1238-4c6...|7611 Maple Blvd, ...|   749505.48|     David|    Smith|                  63.65|9220.52|
|0ae62535-bb04-443...|8336 Pine Ave, Lo...|     45218.1|      Jane|    Brown|                 494.71|4550.61|
|e24d89fe-

In [5]:
# This prints the schema (structure) of your DataFrame df in a tree format.
# It shows: Column names Data types Whether a column can contain null values

df.printSchema()

root
 |-- customer_id: string (nullable = false)
 |-- address: string (nullable = true)
 |-- assets_value: double (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- purchase_amount_per_day: double (nullable = true)
 |-- salary: double (nullable = true)



# Basic Actions and Select

## Transformations and Actions

<table border="1" cellpadding="5" cellspacing="0">
  <tr>
    <th>Type</th>
    <th>Method</th>
    <th>Explanation</th>
  </tr>
  <tr>
    <td>Transformation</td>
    <td>filter()</td>
    <td>Filters rows based on a condition.</td>
  </tr>
  <tr>
    <td>Transformation</td>
    <td>select()</td>
    <td>Selects specific columns from the DataFrame.</td>
  </tr>
  <tr>
    <td>Transformation</td>
    <td>withColumn()</td>
    <td>Adds a new column or replaces an existing one.</td>
  </tr>
  <tr>
    <td>Transformation</td>
    <td>drop()</td>
    <td>Drops one or more columns.</td>
  </tr>
  <tr>
    <td>Transformation</td>
    <td>groupBy()</td>
    <td>Groups rows by a specific column for aggregation.</td>
  </tr>
  <tr>
    <td>Transformation</td>
    <td>orderBy()</td>
    <td>Sorts rows by a column or expression.</td>
  </tr>
  <tr>
    <td>Action</td>
    <td>show()</td>
    <td>Displays the first few rows of the DataFrame.</td>
  </tr>
  <tr>
    <td>Action</td>
    <td>count()</td>
    <td>Returns the number of rows in the DataFrame.</td>
  </tr>
  <tr>
    <td>Action</td>
    <td>collect()</td>
    <td>Returns all rows as a list (caution: can be expensive for large datasets!).</td>
  </tr>
  <tr>
    <td>Action</td>
    <td>take(n)</td>
    <td>Returns the first n rows as a list.</td>
  </tr>
  <tr>
    <td>Action</td>
    <td>write()</td>
    <td>Writes the DataFrame to storage (e.g., as CSV, Parquet, etc.).</td>
  </tr>
</table>


In [6]:
df.show(5) # Show first 5 rows

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|754e9d3c-136d-4c8...|123 Oak St, Los A...|    75236.43|      John|    Jones|                 627.66|8118.38|
|3953f8c0-4097-41f...|6133 Pine Ave, Ne...|   369825.66|       Bob|  Johnson|                 420.54|9603.33|
|02387a88-eeb1-48c...|4217 Pine Ave, Ph...|    689558.9|   Michael|    Smith|                 817.96|7244.67|
|0e07a711-9338-450...|6238 Elm St, Los ...|   914312.12|      Jane|    Jones|                 623.35| 3015.4|
|8489da6b-9076-4e4...|39 Main St, Los A...|    333407.9|     Alice|      Doe|                  137.1|4818.09|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
only showi

In [7]:
df.printSchema() # Show schema

root
 |-- customer_id: string (nullable = false)
 |-- address: string (nullable = true)
 |-- assets_value: double (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- purchase_amount_per_day: double (nullable = true)
 |-- salary: double (nullable = true)



In [8]:
df.count() # Number of rows

102

In [9]:
df.columns # List of columns

['customer_id',
 'address',
 'assets_value',
 'first_name',
 'last_name',
 'purchase_amount_per_day',
 'salary']

In [10]:
df.select("first_name").show(5) # Select one column

+----------+
|first_name|
+----------+
|      John|
|   Michael|
|   Charlie|
|     David|
|     Emily|
+----------+
only showing top 5 rows



In [11]:
df.select("first_name", "last_name").show(5) # Select multiple columns

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Olivia|  Johnson|
|   Michael|    Davis|
|     David|   Miller|
|     Emily|   Garcia|
|     Emily| Williams|
+----------+---------+
only showing top 5 rows



In [12]:
df.head(3) # Get first 3 rows as a list

[Row(customer_id='c0d0534a-cd6e-47a0-869e-2df08d8b6e6f', address='5060 Elm St, New York', assets_value=675089.59, first_name='John', last_name='Williams', purchase_amount_per_day=941.71, salary=8413.3),
 Row(customer_id='7a119254-fa90-46ef-97a5-4c94acd1cd9b', address='3286 Maple Blvd, Phoenix', assets_value=168727.34, first_name='Michael', last_name='Garcia', purchase_amount_per_day=868.08, salary=6211.16),
 Row(customer_id='c085f64c-a96f-4a93-8ff1-f7554f12760c', address='8300 Elm St, Chicago', assets_value=631782.27, first_name='Olivia', last_name='Taylor', purchase_amount_per_day=651.27, salary=9981.72)]

In [13]:
df.describe().show() # Summary statistics for numeric columns

+-------+--------------------+--------------------+-----------------+----------+---------+-----------------------+------------------+
|summary|         customer_id|             address|     assets_value|first_name|last_name|purchase_amount_per_day|            salary|
+-------+--------------------+--------------------+-----------------+----------+---------+-----------------------+------------------+
|  count|                 102|                 102|              102|       102|      102|                    102|               102|
|   mean|                NULL|                NULL|501218.3768627451|      NULL|     NULL|      500.7642156862746| 6685.184215686274|
| stddev|                NULL|                NULL|283491.6815118589|      NULL|     NULL|     286.29281966710784|2023.7110786626272|
|    min|001106b4-eca2-4dd...|1 Pine Ave, Los A...|         15463.39|     Alice|    Brown|                  56.12|            3015.4|
|    max|ff28f413-cac2-482...|9849 Maple Blvd, ...|        996

In [14]:
df.dropDuplicates(["first_name", "last_name"]).show() # Drop duplicate based on subset of columns

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|8489da6b-9076-4e4...|39 Main St, Los A...|    333407.9|     Alice|      Doe|                  137.1|4818.09|
|70ad1e44-ca61-477...|2805 Pine Ave, Ne...|    550679.1|     Alice|   Garcia|                 261.33|9444.34|
|cb816669-351f-45c...|9633 Elm St, Los ...|    15463.39|     Alice|    Smith|                 600.28|9892.78|
|2bb52b82-4f81-421...|935 Main St, Los ...|   445681.62|     Alice| Williams|                 272.24|7320.36|
|3953f8c0-4097-41f...|6133 Pine Ave, Ne...|   369825.66|       Bob|  Johnson|                 420.54|9603.33|
|e24d89fe-d2d8-44e...|9659 Elm St, New ...|   845708.49|       Bob|    Jones|                 757.81|7329.79|
|d5e3cd2b-

In [15]:
df.sample(fraction=0.1, seed=42).show() # Sample 10% of data

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|cad8919d-6afa-40d...|7065 Main St, Los...|   330486.84|     Alice| Williams|                 555.62|6277.64|
|754e9d3c-136d-4c8...|123 Oak St, Los A...|    75236.43|      John|    Jones|                 627.66|8118.38|
|3953f8c0-4097-41f...|6133 Pine Ave, Ne...|   369825.66|       Bob|  Johnson|                 420.54|9603.33|
|ad347a5c-6c89-4e1...|8983 Oak St, Houston|    674981.9|     Emily|  Johnson|                 746.17|4457.55|
|8adc85dd-05f8-494...|9365 Maple Blvd, ...|   604333.54|   Michael|  Johnson|                 666.24| 6445.7|
|e2d6a120-444d-436...|5253 Pine Ave, Ph...|   303791.39|     Alice|   Garcia|                 158.91|7383.59|
|cb816669-

In [16]:
df.selectExpr("salary * 12 as annual_salary").show(5) # Use SQL expression in select

+------------------+
|     annual_salary|
+------------------+
|          97420.56|
|115239.95999999999|
| 86936.04000000001|
|100959.59999999999|
|          74533.92|
+------------------+
only showing top 5 rows



In [17]:
df.describe("assets_value").show() # Summary stats on one column

+-------+------------------+
|summary|      assets_value|
+-------+------------------+
|  count|               102|
|   mean| 501218.3768627451|
| stddev|283491.68151185894|
|    min|          15463.39|
|    max|         996535.31|
+-------+------------------+



In [18]:
df.summary("min", "max").show() # Min and max values

+-------+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|summary|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+-------+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|    min|001106b4-eca2-4dd...|1 Pine Ave, Los A...|    15463.39|     Alice|    Brown|                  56.12| 3015.4|
|    max|ff28f413-cac2-482...|9849 Maple Blvd, ...|   996535.31|    Sophia| Williams|                 991.79|9981.72|
+-------+--------------------+--------------------+------------+----------+---------+-----------------------+-------+



In [19]:
df.filter(df.salary > 8000).show(5) # Filter salary > 8000

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|b9a7cb68-647c-46d...|3243 Maple Blvd, ...|   992926.44|   Charlie| Williams|                 100.57|8110.49|
|e2a373c3-01f3-432...|754 Pine Ave, Pho...|   263018.54|   Michael|    Davis|                 207.93|8174.85|
|faad8779-32f1-4f7...|9757 Elm St, New ...|   675012.21|   Charlie| Williams|                  95.01| 9966.1|
|a045bef9-b281-442...|3786 Oak St, Los ...|   744804.54|    Olivia|      Doe|                 199.58|8491.88|
|5c18aff6-5c5a-4a8...|3902 Oak St, New ...|   314844.16|    Olivia|    Brown|                 170.57|9021.98|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
only showi

In [20]:
df.where(df.last_name == "Smith").show(5) # Filter with where

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|10c2e39b-275b-41d...|9239 Elm St, New ...|   143164.58|       Bob|    Smith|                 917.92|4208.18|
|cb816669-351f-45c...|9633 Elm St, Los ...|    15463.39|     Alice|    Smith|                 600.28|9892.78|
|56a19b3d-1b9b-441...|1541 Maple Blvd, ...|   189084.84|   Michael|    Smith|                 932.98|9559.29|
|de040c86-0422-4f7...|7840 Main St, New...|   111917.41|     Emily|    Smith|                 569.39|7307.63|
|1125910a-60ba-481...|8495 Main St, Pho...|   696736.52|     Emily|    Smith|                 253.62|8470.16|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
only showi

In [21]:
df.select("first_name", "salary").orderBy("salary", ascending=False).show(5) # Sort descending by salary

+----------+-------+
|first_name| salary|
+----------+-------+
|    Olivia|9981.72|
|   Charlie| 9966.1|
|      John|9913.39|
|     Alice|9892.78|
|     David| 9790.8|
+----------+-------+
only showing top 5 rows



In [22]:
df.limit(10).show() # Limit number of rows

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|c0d0534a-cd6e-47a...|5060 Elm St, New ...|   675089.59|      John| Williams|                 941.71| 8413.3|
|7a119254-fa90-46e...|3286 Maple Blvd, ...|   168727.34|   Michael|   Garcia|                 868.08|6211.16|
|c085f64c-a96f-4a9...|8300 Elm St, Chicago|   631782.27|    Olivia|   Taylor|                 651.27|9981.72|
|001106b4-eca2-4dd...|1148 Oak St, New ...|   706468.19|   Charlie|    Davis|                 129.05|7840.35|
|9f7b6f23-c923-4c2...|8604 Maple Blvd, ...|   968805.16|    Olivia|    Davis|                 604.93| 5145.2|
|70ad1e44-ca61-477...|2805 Pine Ave, Ne...|    550679.1|     Alice|   Garcia|                 261.33|9444.34|
|5710091b-

# Column Operations and Transformations

In [23]:
df.withColumn("annual_salary", df.salary * 12).show(5) # Add new column

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+------------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|     annual_salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+------------------+
|754e9d3c-136d-4c8...|123 Oak St, Los A...|    75236.43|      John|    Jones|                 627.66|8118.38|          97420.56|
|3953f8c0-4097-41f...|6133 Pine Ave, Ne...|   369825.66|       Bob|  Johnson|                 420.54|9603.33|115239.95999999999|
|02387a88-eeb1-48c...|4217 Pine Ave, Ph...|    689558.9|   Michael|    Smith|                 817.96|7244.67| 86936.04000000001|
|1125910a-60ba-481...|8495 Main St, Pho...|   696736.52|     Emily|    Smith|                 253.62|8470.16|         101641.92|
|1011d0c2-2e47-43a...|3108 Oak St, Los ...|   655369.65|    Olivia|      Doe|                 481

In [24]:
df.withColumnRenamed("salary", "monthly_salary").show(5) # Rename column

+--------------------+--------------------+------------+----------+---------+-----------------------+--------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day|monthly_salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+--------------+
|cc1e1429-acb8-43c...|6315 Pine Ave, Ph...|    777854.3|   Michael|   Miller|                 172.94|       8292.36|
|10c2e39b-275b-41d...|9239 Elm St, New ...|   143164.58|       Bob|    Smith|                 917.92|       4208.18|
|5d0cffdd-e574-4dc...|2580 Maple Blvd, ...|   739123.47|     David|  Johnson|                  92.82|       5892.88|
|72b38361-62f4-4d4...|7077 Oak St, New ...|   853211.33|     Alice|    Smith|                 315.69|       6224.15|
|e2d6a120-444d-436...|5253 Pine Ave, Ph...|   303791.39|     Alice|   Garcia|                 158.91|       7383.59|
+--------------------+--------------------+------------+--------

In [25]:
from pyspark.sql import functions as F

df.withColumn("full_name", F.concat_ws(" ", df.first_name, df.last_name)).show(5) # Concatenate columns

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+--------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|     full_name|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+--------------+
|9fe9abf9-621c-4e2...|5838 Maple Blvd, ...|   319594.85|   Charlie|   Taylor|                 500.79|9187.35|Charlie Taylor|
|a513c743-502e-475...|4011 Oak St, Houston|   466080.95|      John|   Taylor|                 386.43| 7936.1|   John Taylor|
|63c6073c-8af1-414...|2675 Elm St, Los ...|    63886.02|   Michael|    Brown|                 519.86|3746.47| Michael Brown|
|a3014e1b-b287-4ff...|4996 Oak St, Los ...|    688935.4|     David|    Davis|                 402.19|3534.56|   David Davis|
|96a0695c-f890-49e...|3552 Oak St, New ...|    427849.7|   Michael|    Smith|                 197.41|7084.36| Michael Smith|


In [26]:
df.withColumn("salary_rounded", F.round(df.salary, 0)).show(5) # Round salary

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+--------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|salary_rounded|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+--------------+
|a045bef9-b281-442...|3786 Oak St, Los ...|   744804.54|    Olivia|      Doe|                 199.58|8491.88|        8492.0|
|5c18aff6-5c5a-4a8...|3902 Oak St, New ...|   314844.16|    Olivia|    Brown|                 170.57|9021.98|        9022.0|
|48a04704-1533-478...|9564 Pine Ave, Lo...|   722351.04|   Charlie|  Johnson|                 595.13|9520.67|        9521.0|
|d652f7b4-2c5c-4ea...|9849 Maple Blvd, ...|   343330.21|      Jane|   Taylor|                  984.3|5051.17|        5051.0|
|6ab68490-3d3f-4cc...|2248 Maple Blvd, ...|   839865.77|    Olivia| Williams|                 799.24|3323.81|        3324.0|


In [27]:
df.withColumn("salary_log", F.log(df.salary)).show(5) # Log transform

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+-----------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|       salary_log|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+-----------------+
|cb816669-351f-45c...|9633 Elm St, Los ...|    15463.39|     Alice|    Smith|                 600.28|9892.78|9.199560477129934|
|e99c1388-5df9-47e...|2464 Maple Blvd, ...|   380513.02|    Olivia| Williams|                 129.38|9339.43|9.142000501523356|
|5051db98-429b-4c2...|1 Pine Ave, Los A...|    81176.79|   Michael|   Garcia|                 715.95|5502.64|8.612983256057413|
|09584401-2cc3-475...|4643 Maple Blvd, ...|   262846.98|     Emily|    Davis|                 441.69|9517.67|9.160905349930596|
|e694f6de-ea3d-40b...|361 Oak St, New York|   762254.45|      John| Williams|                 342.36|765

In [28]:
df.select("first_name", "salary").distinct().show() # Unique first name-salary combos

+----------+-------+
|first_name| salary|
+----------+-------+
|   Michael|5282.38|
|     Emily|8120.14|
|     Emily|4513.49|
|     David| 9790.8|
|       Bob|4514.51|
|    Sophia| 4210.6|
|   Charlie|5411.79|
|      John|4762.98|
|     David|8756.06|
|   Charlie|3778.29|
|     David|4997.36|
|    Sophia|6099.39|
|   Charlie|7916.72|
|     Emily|8704.96|
|    Sophia|7837.73|
|   Charlie|6556.81|
|      Jane|9519.42|
|    Olivia|4716.79|
|       Bob|5150.91|
|     Emily|7307.63|
+----------+-------+
only showing top 20 rows



In [29]:
df.filter(F.col("assets_value") > 100000).show(5) # Filter assets_value

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|c0d0534a-cd6e-47a...|5060 Elm St, New ...|   675089.59|      John| Williams|                 941.71| 8413.3|
|7a119254-fa90-46e...|3286 Maple Blvd, ...|   168727.34|   Michael|   Garcia|                 868.08|6211.16|
|c085f64c-a96f-4a9...|8300 Elm St, Chicago|   631782.27|    Olivia|   Taylor|                 651.27|9981.72|
|001106b4-eca2-4dd...|1148 Oak St, New ...|   706468.19|   Charlie|    Davis|                 129.05|7840.35|
|9f7b6f23-c923-4c2...|8604 Maple Blvd, ...|   968805.16|    Olivia|    Davis|                 604.93| 5145.2|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
only showi

In [30]:
df.filter(F.col("first_name").isin("Bob")).show() # Filter with isin

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+
|10c2e39b-275b-41d...|9239 Elm St, New ...|   143164.58|       Bob|    Smith|                 917.92|4208.18|
|3953f8c0-4097-41f...|6133 Pine Ave, Ne...|   369825.66|       Bob|  Johnson|                 420.54|9603.33|
|d5e3cd2b-610b-48f...|9635 Elm St, Houston|   390784.68|       Bob|   Miller|                 898.33|5150.91|
|16a9a5f0-2741-4ba...|5605 Pine Ave, Lo...|   857436.38|       Bob|   Taylor|                 969.06|4514.51|
|e24d89fe-d2d8-44e...|9659 Elm St, New ...|   845708.49|       Bob|    Jones|                 757.81|7329.79|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+



In [31]:
df.withColumn("is_high_salary", F.when(df.salary > 8000, True).otherwise(False)).show(5)

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+--------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|is_high_salary|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+--------------+
|c0d0534a-cd6e-47a...|5060 Elm St, New ...|   675089.59|      John| Williams|                 941.71| 8413.3|          true|
|7a119254-fa90-46e...|3286 Maple Blvd, ...|   168727.34|   Michael|   Garcia|                 868.08|6211.16|         false|
|c085f64c-a96f-4a9...|8300 Elm St, Chicago|   631782.27|    Olivia|   Taylor|                 651.27|9981.72|          true|
|001106b4-eca2-4dd...|1148 Oak St, New ...|   706468.19|   Charlie|    Davis|                 129.05|7840.35|         false|
|9f7b6f23-c923-4c2...|8604 Maple Blvd, ...|   968805.16|    Olivia|    Davis|                 604.93| 5145.2|         false|


In [32]:
df.select(F.countDistinct("last_name")).show() # Count distinct last names

+-------------------------+
|count(DISTINCT last_name)|
+-------------------------+
|                       10|
+-------------------------+



In [33]:
df.select(F.mean("salary"), F.min("salary"), F.max("salary")).show() # Mean, min, max

+-----------------+-----------+-----------+
|      avg(salary)|min(salary)|max(salary)|
+-----------------+-----------+-----------+
|6685.184215686274|     3015.4|    9981.72|
+-----------------+-----------+-----------+



In [34]:
df.withColumn("salary_in_k", df.salary / 1000).show(5)

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+-----------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|salary_in_k|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+-----------+
|1125910a-60ba-481...|8495 Main St, Pho...|   696736.52|     Emily|    Smith|                 253.62|8470.16|    8.47016|
|1011d0c2-2e47-43a...|3108 Oak St, Los ...|   655369.65|    Olivia|      Doe|                 481.71|9708.16|    9.70816|
|d920e993-7616-41c...|4517 Pine Ave, Ch...|   159208.98|    Sophia| Williams|                 864.96|5307.29|    5.30729|
|18f478a5-2b59-4b2...|6367 Main St, Pho...|   607651.75|     Emily| Williams|                 755.51|6815.21|    6.81521|
|86d99ab8-3f72-4fe...|725 Main St, Los ...|   505158.31|    Sophia|    Smith|                 918.66|7464.95|    7.46495|
+--------------------+--

In [35]:
df.withColumn("salary_plus_assets", df.salary + df.assets_value).show(5)

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+------------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|salary_plus_assets|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+------------------+
|5d0cffdd-e574-4dc...|2580 Maple Blvd, ...|   739123.47|     David|  Johnson|                  92.82|5892.88|         745016.35|
|72b38361-62f4-4d4...|7077 Oak St, New ...|   853211.33|     Alice|    Smith|                 315.69|6224.15|         859435.48|
|e2d6a120-444d-436...|5253 Pine Ave, Ph...|   303791.39|     Alice|   Garcia|                 158.91|7383.59|311174.98000000004|
|6974cc7e-74ab-4d1...|1522 Main St, New...|    63885.36|   Michael| Williams|                 854.13|6239.42|          70124.78|
|39aa710c-7451-4c6...|8494 Oak St, Phoenix|    44695.27|     David|    Davis|                 991

# Aggregations and GroupBy

In [36]:
df.groupBy("last_name").count().show() # Count per last name

+---------+-----+
|last_name|count|
+---------+-----+
|    Jones|    4|
|    Smith|   14|
|  Johnson|    7|
|    Davis|   12|
| Williams|   19|
|   Garcia|    9|
|    Brown|   11|
|   Miller|    7|
|      Doe|    7|
|   Taylor|   12|
+---------+-----+



In [37]:
df.groupBy("last_name").agg(F.avg("salary").alias("avg_salary")).show()

+---------+------------------+
|last_name|        avg_salary|
+---------+------------------+
|    Davis| 6283.194999999999|
| Williams| 6712.712105263157|
|   Miller|           7115.67|
|   Garcia| 6194.517777777778|
|  Johnson| 7202.782857142857|
|   Taylor|          6485.335|
|    Jones| 6269.030000000001|
|    Smith|6827.2357142857145|
|      Doe| 6905.512857142857|
|    Brown| 6922.641818181818|
+---------+------------------+



In [38]:
df.groupBy("last_name", "first_name").agg(F.sum("purchase_amount_per_day")).show()

+---------+----------+----------------------------+
|last_name|first_name|sum(purchase_amount_per_day)|
+---------+----------+----------------------------+
|  Johnson|    Olivia|                      665.29|
|   Miller|     David|          1122.1100000000001|
|    Davis|   Michael|           513.6800000000001|
| Williams|     Emily|                     1542.74|
|   Garcia|     Emily|                     1396.78|
|   Taylor|      Jane|                     1937.87|
|   Taylor|      John|                      448.42|
|    Brown|      John|          1711.5299999999997|
|  Johnson|   Michael|                      666.24|
|      Doe|   Charlie|                      187.56|
|    Davis|      John|                      153.21|
|    Smith|   Charlie|                      1564.5|
|    Brown|   Michael|                      764.65|
|    Davis|      Jane|                      675.28|
|    Smith|   Michael|          1948.3500000000001|
|    Davis|     David|                     1393.98|
| Williams| 

# Window Functions

In [39]:
from pyspark.sql.window import Window
window_spec = Window.partitionBy("last_name").orderBy(F.desc("salary"))
df.withColumn("rank", F.rank().over(window_spec)).show(10)

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+----+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|rank|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+----+
|1f3fcf8a-d218-45c...|295 Maple Blvd, N...|   328067.91|      John|    Brown|                 236.42|9913.39|   1|
|2779860d-d659-427...|9107 Main St, Chi...|    393769.9|     David|    Brown|                 660.29| 9790.8|   2|
|5f7dddd9-7486-435...|7953 Oak St, Phoenix|   257875.56|      John|    Brown|                 745.43|9664.46|   3|
|2c038c58-4074-461...|1441 Pine Ave, Ph...|    35767.32|     David|    Brown|                  708.3|9131.28|   4|
|5c18aff6-5c5a-4a8...|3902 Oak St, New ...|   314844.16|    Olivia|    Brown|                 170.57|9021.98|   5|
|6a35e9e0-59cf-4f9...|5050 Main St, New...|   930475.79|      Jane|    Brown|   

In [40]:
df.withColumn("avg_salary_by_last", F.avg("salary").over(Window.partitionBy("last_name"))).show(5)
# Average salary per last name  

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+------------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|avg_salary_by_last|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+------------------+
|5f7dddd9-7486-435...|7953 Oak St, Phoenix|   257875.56|      John|    Brown|                 745.43|9664.46| 6922.641818181818|
|2779860d-d659-427...|9107 Main St, Chi...|    393769.9|     David|    Brown|                 660.29| 9790.8| 6922.641818181818|
|2bb56d74-6e87-41a...|2273 Main St, Chi...|    95015.11|     Emily|    Brown|                  56.12|4513.49| 6922.641818181818|
|0fb80c1c-901c-480...|9316 Pine Ave, Lo...|   452172.58|   Michael|    Brown|                 244.79|5282.38| 6922.641818181818|
|63c6073c-8af1-414...|2675 Elm St, Los ...|    63886.02|   Michael|    Brown|                 519

In [41]:
df.withColumn("cumulative_purchase", F.sum("purchase_amount_per_day").over(Window.orderBy("salary").rowsBetween(Window.unboundedPreceding, 0))).show(5)
# Cumulative sum by salary ascending  

+--------------------+--------------------+------------+----------+---------+-----------------------+-------+-------------------+
|         customer_id|             address|assets_value|first_name|last_name|purchase_amount_per_day| salary|cumulative_purchase|
+--------------------+--------------------+------------+----------+---------+-----------------------+-------+-------------------+
|0e07a711-9338-450...|6238 Elm St, Los ...|   914312.12|      Jane|    Jones|                 623.35| 3015.4|             623.35|
|14f7fbdd-fa82-4bb...|5999 Main St, Hou...|   514902.95|     Emily|   Taylor|                 911.94| 3216.6|            1535.29|
|6ab68490-3d3f-4cc...|2248 Maple Blvd, ...|   839865.77|    Olivia| Williams|                 799.24|3323.81| 2334.5299999999997|
|b36b1045-184b-49e...|3190 Main St, Hou...|   413022.51|      Jane|      Doe|                 576.27|3504.42| 2910.7999999999997|
|a3014e1b-b287-4ff...|4996 Oak St, Los ...|    688935.4|     David|    Davis|             

# SQL 

In [42]:
df.createOrReplaceTempView("customers")
spark.sql("SELECT first_name, last_name, salary FROM customers WHERE salary > 8000").show(5)
# SQL query 

+----------+---------+-------+
|first_name|last_name| salary|
+----------+---------+-------+
|      John|    Jones|8118.38|
|       Bob|  Johnson|9603.33|
|   Charlie|   Taylor|9187.35|
|     Emily|    Smith|8470.16|
|    Olivia|      Doe|9708.16|
+----------+---------+-------+
only showing top 5 rows



In [43]:
spark.sql("SELECT last_name, AVG(salary) as avg_salary FROM customers GROUP BY last_name").show()
# SQL groupby

+---------+-----------------+
|last_name|       avg_salary|
+---------+-----------------+
| Williams|6712.712105263158|
|      Doe|6905.512857142858|
|   Taylor|         6485.335|
|    Brown|6922.641818181818|
|  Johnson|7202.782857142856|
|    Davis|         6283.195|
|   Garcia|6194.517777777777|
|    Smith|6827.235714285714|
|   Miller|          7115.67|
|    Jones|6269.030000000001|
+---------+-----------------+

