# 101 PySpark Exercises

## How to import PySpark and check the version

In [1]:
import findspark
findspark.init()

# Creating a SparkSession: A SparkSession is the entry point for using the PySpark DataFrame and SQL API.
# To create a SparkSession, use the following code
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark 101 Exercises").getOrCreate()

# Get version details
print(spark.version)

3.5.0


## How to convert the index of a PySpark DataFrame into a column

In [2]:
# Assuming df is your DataFrame
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()

+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|    1|
|    Bob|    2|
|Charlie|    3|
+-------+-----+



In [3]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("index", row_number().over(w) - 1)

df.show()

+-------+-----+-----+
|   Name|Value|index|
+-------+-----+-----+
|  Alice|    1|    0|
|    Bob|    2|    1|
|Charlie|    3|    2|
+-------+-----+-----+



## How to get the minimum, 25th percentile, median, 75th, and max of a numeric column

In [4]:
# Create a sample DataFrame
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+



### https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.approxQuantile.html

**pyspark.sql.DataFrame.approxQuantile**

DataFrame.approxQuantile(col: Union[str, List[str], Tuple[str]], probabilities: Union[List[float], Tuple[float]], relativeError: float) → Union[List[float], List[List[float]]][source]

Calculates the approximate quantiles of numerical columns of a DataFrame.

The result of this algorithm has the following deterministic bound: If the DataFrame has N elements and if we request the quantile at probability p up to error err, then the algorithm will return a sample x from the DataFrame so that the exact rank of x is close to (p * N). More precisely,

floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).

In [5]:
# Calculate percentiles
quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)

print("Min: ", quantiles[0])
print("25th percentile: ", quantiles[1])
print("Median: ", quantiles[2])
print("75th percentile: ", quantiles[3])
print("Max: ", quantiles[4])

Min:  10.0
25th percentile:  20.0
Median:  30.0
75th percentile:  50.0
Max:  86.0


## How to get frequency counts of unique items of a column

In [6]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.appName("PySpark Session 02").getOrCreate()

In [7]:
# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+



In [8]:
df.groupBy("job").count().show()

+---------+-----+
|      job|count|
+---------+-----+
| Engineer|    4|
|Scientist|    2|
|   Doctor|    1|
+---------+-----+



## How to Drop rows with NA values specific to a particular column

In [9]:
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B| NULL| 123|
|   B|    3| 456|
|   D| NULL|NULL|
+----+-----+----+



### https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.dropna.html#pyspark.sql.DataFrame.dropna
**pyspark.sql.DataFrame.dropna**

Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and DataFrameNaFunctions.drop() are aliases of each other.

In [10]:
df_0 = df.dropna()
df_1 = df.dropna(subset=['Value'])
df_2 = df.dropna(subset=['id'])

df_0.show()
df_1.show()
df_2.show()

+----+-----+---+
|Name|Value| id|
+----+-----+---+
|   B|    3|456|
+----+-----+---+

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B|    3| 456|
+----+-----+----+

+----+-----+---+
|Name|Value| id|
+----+-----+---+
|   B| NULL|123|
|   B|    3|456|
+----+-----+---+



## How to rename columns of a PySpark DataFrame 

In [11]:
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumnRenamed('age', 'age2').show()

+----+-----+
|age2| name|
+----+-----+
|   2|Alice|
|   5|  Bob|
+----+-----+



In [12]:
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])
# old column names
old_names = ["col1", "col2", "col3"]
# new column names
new_names = ["new_col1", "new_col2", "new_col3"]
df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



In [13]:
df.withColumnRenamed('col1', 'newcol1')
df.withColumnRenamed('col2', 'newcol2')
df.withColumnRenamed('col3', 'newcol3')
df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



In [14]:
df1 = df.withColumnRenamed('col1', 'newcol1')
df2 = df1.withColumnRenamed('col2', 'newcol2')
df3 = df2.withColumnRenamed('col3', 'newcol3')
df3.show()

+-------+-------+-------+
|newcol1|newcol2|newcol3|
+-------+-------+-------+
|      1|      2|      3|
|      4|      5|      6|
+-------+-------+-------+



## How to bin a numeric list to 10 groups of equal size

In [15]:
from pyspark.sql.functions import rand
from pyspark.ml.feature import Bucketizer

# Create a DataFrame with a single column "values" filled with random numbers
num_items = 100
df = spark.range(num_items).select(rand(seed=42).alias("values"))

df.show(5)

+-------------------+
|             values|
+-------------------+
|  0.619189370225301|
| 0.5096018842446481|
| 0.8325259388871524|
|0.26322809041172357|
| 0.6702867696264135|
+-------------------+
only showing top 5 rows



In [16]:
# Define the bucket boundaries
num_buckets = 10
quantiles = df.stat.approxQuantile("values", [i/num_buckets for i in range(num_buckets+1)], 0.01)

print (quantiles)


[0.0031434569487398534, 0.13079406894907553, 0.23893974653627081, 0.30357970527906064, 0.4484250584033179, 0.5844552193973006, 0.6702867696264135, 0.7290493814691454, 0.8017532427858894, 0.8505582285081454, 0.9991441647585968]


In [17]:
# Create the Bucketizer
bucketizer = Bucketizer(splits=quantiles, inputCol="values", outputCol="buckets")

# Apply the Bucketizer
df_buck = bucketizer.transform(df)

df_buck.show()


+-------------------+-------+
|             values|buckets|
+-------------------+-------+
|  0.619189370225301|    5.0|
| 0.5096018842446481|    4.0|
| 0.8325259388871524|    8.0|
|0.26322809041172357|    2.0|
| 0.6702867696264135|    6.0|
| 0.5173283545794627|    4.0|
| 0.9991441647585968|    9.0|
|0.06993233728279169|    0.0|
| 0.9696695610826327|    9.0|
| 0.7959575617927873|    7.0|
| 0.4484250584033179|    4.0|
| 0.6793959570375868|    6.0|
| 0.8017532427858894|    8.0|
| 0.6565552949992319|    5.0|
| 0.2515595782593636|    2.0|
| 0.2073428376111074|    1.0|
| 0.6392921379278927|    5.0|
| 0.8505582285081454|    9.0|
| 0.8184715436384177|    8.0|
| 0.7555506990689408|    7.0|
+-------------------+-------+
only showing top 20 rows



In [18]:
#Frequency table
df_result=df_buck.groupBy("buckets").count()

# Show the original and bucketed values
df_result.show(5)

+-------+-----+
|buckets|count|
+-------+-----+
|    8.0|   10|
|    0.0|    8|
|    7.0|   10|
|    4.0|   10|
|    2.0|   10|
+-------+-----+
only showing top 5 rows



## How to stack two DataFrames vertically

In [19]:
# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_3|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



In [20]:
df_A.union(df_B).show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



## How to convert the first character of each element in a series to uppercase

In [21]:
# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])
df.show()


+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+



### https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.initcap.html?highlight=initcap
**pyspark.sql.functions.initcap**

Translate the first letter of each word to upper case in the sentence.

In [22]:
from pyspark.sql.functions import initcap

data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])


# Convert the first character to uppercase
df1 = df.withColumn("name", initcap(df["name"]))
df1.show()


+-----+
| name|
+-----+
| John|
|Alice|
|  Bob|
+-----+



In [23]:
# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
+------+-----+-----+



In [24]:
df_A = df_A.withColumn("Name", initcap(df_A["Name"]))

df_A.show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| Apple|    3|    5|
|Banana|    1|   10|
|Orange|    2|    8|
+------+-----+-----+



## How to calculate the number of characters in each word in a column

In [30]:
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+



In [31]:
from pyspark.sql import functions as F

df = df.withColumn('word_length', F.length(df.name))
df.show()

+-----+-----------+
| name|word_length|
+-----+-----------+
| john|          4|
|alice|          5|
|  bob|          3|
+-----+-----------+



## How to compute difference of differences between consecutive numbers of a column

In [32]:
# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
|  Maria| 29| 80000|
|    Jen| 32| 65000|
+-------+---+------+



In [33]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define window specification
df = df.withColumn("id", F.monotonically_increasing_id())
df.show()
window = Window.orderBy("id")

print (window)

# Generate the lag of the variable
df = df.withColumn("prev_value", F.lag(df.salary).over(window))

# Compute the difference with lag
df = df.withColumn("diff", F.when(F.isnull(df.salary - df.prev_value), 0)
.otherwise(df.salary - df.prev_value)).drop("id")

df.show()

+-------+---+------+-----------+
|   name|age|salary|         id|
+-------+---+------+-----------+
|  James| 34| 55000| 8589934592|
|Michael| 30| 70000|25769803776|
| Robert| 37| 60000|34359738368|
|  Maria| 29| 80000|51539607552|
|    Jen| 32| 65000|60129542144|
+-------+---+------+-----------+

<pyspark.sql.window.WindowSpec object at 0x000001E81E1E4D30>
+-------+---+------+----------+------+
|   name|age|salary|prev_value|  diff|
+-------+---+------+----------+------+
|  James| 34| 55000|      NULL|     0|
|Michael| 30| 70000|     55000| 15000|
| Robert| 37| 60000|     70000|-10000|
|  Maria| 29| 80000|     60000| 20000|
|    Jen| 32| 65000|     80000|-15000|
+-------+---+------+----------+------+



# Date

In [34]:
# example data
data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])

df.show()

+----------+-----------+
|date_str_1| date_str_2|
+----------+-----------+
|2023-05-18|01 Jan 2010|
|2023-12-31|01 Jan 2010|
+----------+-----------+



In [35]:
from pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, dayofweek

# Convert date string to date format
df = df.withColumn("date_1", to_date(df.date_str_1, 'yyyy-MM-dd'))
df = df.withColumn("date_2", to_date(df.date_str_2, 'dd MMM yyyy'))

df = df.withColumn("day_of_month", dayofmonth(df.date_1))\
.withColumn("week_number", weekofyear(df.date_1))\
.withColumn("day_of_year", dayofyear(df.date_1))\
.withColumn("day_of_week", dayofweek(df.date_1))

df.show()

+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|date_str_1| date_str_2|    date_1|    date_2|day_of_month|week_number|day_of_year|day_of_week|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|2023-05-18|01 Jan 2010|2023-05-18|2010-01-01|          18|         20|        138|          5|
|2023-12-31|01 Jan 2010|2023-12-31|2010-01-01|          31|         52|        365|          1|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+



# File

## How to get the nrows, ncolumns, datatype of a dataframe

In [36]:
#df = spark.read.csv("./Churn_Modelling.csv", header=True, inferSchema=True)
churn = spark.read.csv('./Churn_Modelling.csv',header=True,escape="\"")
churn.show(5, truncate=False)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance  |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|1        |15634602  |Hargrave|619        |France   |Female|42 |2     |0        |1            |1        |1             |101348.88      |1     |
|2        |15647311  |Hill    |608        |Spain    |Female|41 |1     |83807.86 |1            |0        |1             |112542.58      |0     |
|3        |15619304  |Onio    |502        |France   |Female|42 |8     |159660.8 |3            |1        |0             |113931.57      |1     |
|4        |15701354  |Boni    |699        |France   |Female|39 |1     |0        |2            |0        |0             |93826.63       |

In [37]:
# For number of rows
nrows = df.count()
print("Number of Rows: ", nrows)

# For number of columns
ncols = len(df.columns)
print("Number of Columns: ", ncols)

# For data types of each column
datatypes = df.dtypes
print("Data types: ", datatypes)

Number of Rows:  2
Number of Columns:  8
Data types:  [('date_str_1', 'string'), ('date_str_2', 'string'), ('date_1', 'date'), ('date_2', 'date'), ('day_of_month', 'int'), ('week_number', 'int'), ('day_of_year', 'int'), ('day_of_week', 'int')]


## How to check if a dataframe has any missing values and count of missing values in each column

In [None]:
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

In [None]:
from pyspark.sql.functions import col, sum, isnan, count, when

missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
print(missing)


has_missing = any(row.asDict().values() for row in missing.collect())
print(has_missing)

missing_count = missing.collect()[0].asDict()
print(missing_count)

In [None]:
df.select([count(when(col("Value").isNull(), 1)).alias("num")]).show()

## How to change the order of columns of a dataframe

In [None]:
# Sample data
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Alice", "Smith", 22)]

# Create DataFrame from the data
df = spark.createDataFrame(data, ["First_Name", "Last_Name", "Age"])

# Show the DataFrame
df.show()

In [None]:
new_order = ["Age", "First_Name", "Last_Name"]

# Reorder the columns
df = df.select(*new_order)

# Show the DataFrame with reordered columns
df.show()

## How to format or suppress scientific notations in a PySpark DataFrame

In [None]:
# Assuming you have a DataFrame df and the column you want to format is 'your_column'
df = spark.createDataFrame([(1, 0.000000123), (2, 0.000023456), (3, 0.000345678)], ["id", "your_column"])

df.show()

In [None]:
from pyspark.sql.functions import format_number

# Determine the number of decimal places you want
decimal_places = 10

df = df.withColumn("your_column", format_number("your_column", decimal_places))
df.show()

## How to format all the values in a dataframe as percentages

In [None]:
# Sample data
data = [(0.1, .08), (0.2, .06), (0.33, .02)]
df = spark.createDataFrame(data, ["numbers_1", "numbers_2"])

df.show()

### https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lit.html

**pyspark.sql.functions.lit**

Creates a Column of literal value

In [None]:
from pyspark.sql.functions import concat, col, lit

columns = ["numbers_1", "numbers_2"]

for col_name in columns:
df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))

df.show()