In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark_sql").getOrCreate()

print("Create a sample RDD")
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob"), (3, "Charlie")])

print("Create a PySpark DataFrame from RDD")
df = spark.createDataFrame(rdd, ["id", "name"])

print("Print the contents of the DataFrame")
df.show()

24/10/01 14:00:32 WARN Utils: Your hostname, LAP-0285 resolves to a loopback address: 127.0.1.1; using 192.168.0.105 instead (on interface wlp0s20f3)
24/10/01 14:00:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/01 14:00:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Create a sample RDD
Create a PySpark DataFrame from RDD


                                                                                

Print the contents of the DataFrame
+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+



# From external data sources#

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark_sql").getOrCreate()

print(f'Import "file.csv" into PySpark DataFrame')
df = spark.read.csv("file.csv", header=True, inferSchema=True)

print("Print the contents of the PySpark DataFrame")
df.show(10)

Import "file.csv" into PySpark DataFrame
Print the contents of the PySpark DataFrame
+-----------------+--------------------+--------------------+---+
|             name|               email|                city|age|
+-----------------+--------------------+--------------------+---+
|     Keeley Bosco|katlyn@jenkinsmag...|     Lake Gladysberg| 22|
| Dr. Araceli Lang|mavis_lehner@jaco...|         Yvettemouth| 34|
|    Terrell Boyle|augustine.conroy@...|     Port Reaganfort| 36|
|Alessandro Barton|sigurd.hudson@hod...|         South Pearl| 21|
|      Keven Purdy|carter_zboncak@sc...|Port Marjolaineshire| 33|
+-----------------+--------------------+--------------------+---+



# From a pandas DataFrame


In [7]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import Row
spark = SparkSession.builder.appName("pyspark_sql").getOrCreate()

print("Create a Python dictionary")
data = {"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"], "age": [25, 30, 35]}

print("Create a Pandas DataFrame")
pandas_df = pd.DataFrame(data)

print("Convert Pandas DataFrame to PySpark DataFrame")
rows = [Row(**row) for row in pandas_df.to_dict(orient='records')]
print("rows: ",rows)
df = spark.createDataFrame(rows)

print("Print the contents of the PySpark DataFrame")
df.show()

Create a Python dictionary
Create a Pandas DataFrame
Convert Pandas DataFrame to PySpark DataFrame
rows:  [Row(id=1, name='Alice', age=25), Row(id=2, name='Bob', age=30), Row(id=3, name='Charlie', age=35)]
Print the contents of the PySpark DataFrame
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+



# DataFrame transformations

select(): Selects columns from the DataFrame.

filter(): Filters rows based on a condition.

withColumn(): Adds or replaces a column in the DataFrame.

groupBy(): Groups the DataFrame by specified columns.

orderBy(): Sorts the DataFrame by specified columns.

drop(): Drops specified columns from the DataFrame.

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameTransformations").getOrCreate()

print(f'Import "file.csv" into PySpark DataFrame')
df = spark.read.csv("file.csv", header=True, inferSchema=True)

print("Showing the contents of the DataFrame:")
df.select("name", "city").show()

print("Filtering for rows whose age is above 21:")
df.filter(df["age"] > 21).show()

print("Creating a new column by subtracting 100 from the person's age:")
df.withColumn('new_column', 100 - df['age']).show()

print("Grouping by city and then taking an average of age from that grouped city:")
df.groupBy("city").agg({'age': 'sum'}).show()

print("Sorting the DataFrame by age and printing the DataFrame:")
df.orderBy('age').show()

print("Removing email and city columns and printing the rest of the columns:")
df.drop('email', 'city').show()

Import "file.csv" into PySpark DataFrame
Showing the contents of the DataFrame:
+-----------------+--------------------+
|             name|                city|
+-----------------+--------------------+
|     Keeley Bosco|     Lake Gladysberg|
| Dr. Araceli Lang|         Yvettemouth|
|    Terrell Boyle|     Port Reaganfort|
|Alessandro Barton|         South Pearl|
|      Keven Purdy|Port Marjolaineshire|
+-----------------+--------------------+

Filtering for rows whose age is above 21:
+----------------+--------------------+--------------------+---+
|            name|               email|                city|age|
+----------------+--------------------+--------------------+---+
|    Keeley Bosco|katlyn@jenkinsmag...|     Lake Gladysberg| 22|
|Dr. Araceli Lang|mavis_lehner@jaco...|         Yvettemouth| 34|
|   Terrell Boyle|augustine.conroy@...|     Port Reaganfort| 36|
|     Keven Purdy|carter_zboncak@sc...|Port Marjolaineshire| 33|
+----------------+--------------------+--------------

# DataFrame actions

show(): Displays the first n rows of the DataFrame.

count(): Counts the number of rows in the DataFrame.

collect(): Retrieves all the data in the DataFrame as a list.

first(): Retrieves the first row of the DataFrame.

take(n): Retrieves the first n rows of the DataFrame.

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

print(f'Import "file.csv" into PySpark DataFrame')
df = spark.read.csv("file.csv", header=True, inferSchema=True)

print("Printing the contents of the DataFrame:")
df.show()

print("Showing the number of rows in the DataFrame:")
print(df.count())

print("Retrieving all the data in the DataFrame as a list instead of a table:")
print(df.collect())

print("Showing the first row of the DataFrame:")
print(df.first())

print("Showing the first two rows of the DataFrame:")
print(df.take(2))

print("Filtering for rows whose age is above 21:")
df.filter(df['age'] > 21).show()

Import "file.csv" into PySpark DataFrame
Printing the contents of the DataFrame:
+-----------------+--------------------+--------------------+---+
|             name|               email|                city|age|
+-----------------+--------------------+--------------------+---+
|     Keeley Bosco|katlyn@jenkinsmag...|     Lake Gladysberg| 22|
| Dr. Araceli Lang|mavis_lehner@jaco...|         Yvettemouth| 34|
|    Terrell Boyle|augustine.conroy@...|     Port Reaganfort| 36|
|Alessandro Barton|sigurd.hudson@hod...|         South Pearl| 21|
|      Keven Purdy|carter_zboncak@sc...|Port Marjolaineshire| 33|
+-----------------+--------------------+--------------------+---+

Showing the number of rows in the DataFrame:
5
Retrieving all the data in the DataFrame as a list instead of a table:
[Row(name='Keeley Bosco', email='katlyn@jenkinsmaggio.net', city='Lake Gladysberg', age=22), Row(name='Dr. Araceli Lang', email='mavis_lehner@jacobi.name', city='Yvettemouth', age=34), Row(name='Terrell Boy

# Joins

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameActions").getOrCreate()

print(f'Import "file.csv" into PySpark DataFrame')
df1 = spark.read.csv("file.csv", header=True, inferSchema=True)

print(f'Import "file2.csv" into PySpark DataFrame')
df2 = spark.read.csv("file2.csv", header=True, inferSchema=True)

print("Join dataframe 2 with DataFrame 1 using inner")
joined_df = df1.join(df2, on="id", how="inner")

print("Showing the joined DataFrame using inner join:")
joined_df.show()

print("Join dataframe 2 with DataFrame 1 using outer")
joined_df = df1.join(df2, on="id", how="outer")

print("Showing the joined DataFrame using outer join:")
joined_df.show()

Import "file.csv" into PySpark DataFrame
Import "file2.csv" into PySpark DataFrame
Join dataframe 2 with DataFrame 1 using inner
Showing the joined DataFrame using inner join:
+---+-----------------+--------------------+--------------------+---+------------+
| id|             name|               email|                city|age|credit_score|
+---+-----------------+--------------------+--------------------+---+------------+
|  1|     Keeley Bosco|katlyn@jenkinsmag...|     Lake Gladysberg| 22|         650|
|  2| Dr. Araceli Lang|mavis_lehner@jaco...|         Yvettemouth| 34|         720|
|  3|    Terrell Boyle|augustine.conroy@...|     Port Reaganfort| 36|         800|
|  4|Alessandro Barton|sigurd.hudson@hod...|         South Pearl| 21|         600|
|  5|      Keven Purdy|carter_zboncak@sc...|Port Marjolaineshire| 33|         750|
+---+-----------------+--------------------+--------------------+---+------------+

Join dataframe 2 with DataFrame 1 using outer
Showing the joined DataFrame u

# Aggregate functions

In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

print(f'Import "file.csv" into PySpark DataFrame')
df = spark.read.csv("emp.csv", header=True, inferSchema=True)

print("Printing the contents of the DataFrame")
df.show()

print("Create a `WindowSpec` object")
windowSpec = Window.partitionBy('department')

print("Printing the output from Windows function")
df.withColumn("avg_salary", avg('salary').over(windowSpec)).show()

Import "file.csv" into PySpark DataFrame
Printing the contents of the DataFrame
+---+----------+--------+------+
| id|department|employee|salary|
+---+----------+--------+------+
|  1|        IT|    John| 50000|
|  2|        IT|    Jane| 55000|
|  3|        IT|     Bob| 60000|
|  4|        HR|   Alice| 65000|
|  5|        HR|     Eve| 70000|
|  6|        HR| Charles| 75000|
|  7|   Finance|  Oliver| 80000|
|  8|   Finance|   Ellen| 85000|
|  9|   Finance|   Frank| 90000|
| 10| Marketing|   Grace| 95000|
| 11| Marketing|   Henry|100000|
| 12| Marketing|     Ivy|105000|
+---+----------+--------+------+

Create a `WindowSpec` object
Printing the output from Windows function
+---+----------+--------+------+----------+
| id|department|employee|salary|avg_salary|
+---+----------+--------+------+----------+
|  7|   Finance|  Oliver| 80000|   85000.0|
|  8|   Finance|   Ellen| 85000|   85000.0|
|  9|   Finance|   Frank| 90000|   85000.0|
|  4|        HR|   Alice| 65000|   70000.0|
|  5|       

# Ranking functions

In [14]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number

print("Create a `WindowSpec` object")
windowSpec = Window.partitionBy('sales_person').orderBy('sales_amount')

print(f'Import "file.csv" into PySpark DataFrame')
df = spark.read.csv("sales.csv", header=True, inferSchema=True)

print("Assigning rank to each row:")
df.withColumn('rank', rank().over(windowSpec)).show()

print("Assigning dense rank to each row:")
df.withColumn('dense_rank', dense_rank().over(windowSpec)).show()

print("Assigning row number to each row:")
df.withColumn('row_number', row_number().over(windowSpec)).show()

Create a `WindowSpec` object
Import "file.csv" into PySpark DataFrame
Assigning rank to each row:
+---+----------+------------+------------+----+
| id|sales_date|sales_person|sales_amount|rank|
+---+----------+------------+------------+----+
|  3|2022-01-02|       Alice|         750|   1|
|  5|2022-01-03|       Alice|         800|   2|
|  7|2022-01-04|       Alice|         900|   3|
|  1|2022-01-01|       Alice|        1000|   4|
|  9|2022-01-05|       Alice|        1500|   5|
|  2|2022-01-01|         Bob|         500|   1|
|  6|2022-01-03|         Bob|         600|   2|
| 10|2022-01-05|         Bob|         800|   3|
|  8|2022-01-04|         Bob|        1100|   4|
|  4|2022-01-02|         Bob|        1200|   5|
+---+----------+------------+------------+----+

Assigning dense rank to each row:
+---+----------+------------+------------+----------+
| id|sales_date|sales_person|sales_amount|dense_rank|
+---+----------+------------+------------+----------+
|  3|2022-01-02|       Alice|    

# Analytic functions
Analytic functions compute a value for each input row based on a group of related rows. Commonly used analytic functions include lead(), lag(), and ntile().

In [16]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, lag, ntile

print("Create a `WindowSpec` object")
windowSpec = Window.partitionBy('sales_person').orderBy('sales_amount')

print(f'Import "file.csv" into PySpark DataFrame')
df = spark.read.csv("sales.csv", header=True, inferSchema=True)

print("Next month's sales for each category:")
df.withColumn('next_month_sales', lead('sales_amount', 1).over(windowSpec)).show()

print("Previous month's sales for each category:")
df.withColumn('prev_month_sales', lag('sales_amount', 1).over(windowSpec)).show()

print("Quartiles of sales for each category:")
df.withColumn('quartile', ntile(4).over(windowSpec)).show()

Create a `WindowSpec` object
Import "file.csv" into PySpark DataFrame
Next month's sales for each category:
+---+----------+------------+------------+----------------+
| id|sales_date|sales_person|sales_amount|next_month_sales|
+---+----------+------------+------------+----------------+
|  3|2022-01-02|       Alice|         750|             800|
|  5|2022-01-03|       Alice|         800|             900|
|  7|2022-01-04|       Alice|         900|            1000|
|  1|2022-01-01|       Alice|        1000|            1500|
|  9|2022-01-05|       Alice|        1500|            null|
|  2|2022-01-01|         Bob|         500|             600|
|  6|2022-01-03|         Bob|         600|             800|
| 10|2022-01-05|         Bob|         800|            1100|
|  8|2022-01-04|         Bob|        1100|            1200|
|  4|2022-01-02|         Bob|        1200|            null|
+---+----------+------------+------------+----------------+

Previous month's sales for each category:
+---+----