<a href="https://colab.research.google.com/github/DumontHenry/exercise_PySpark/blob/main/PySpark_exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark==3.4.1
!pip install findspark==2.0.1

Collecting pyspark==3.4.1
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285386 sha256=5c261d62fadde1757d87d69c80cd6839aa0defe8dc870ed95a70e08245465e85
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1
Collecting findspark==2.0.1
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark 101 Exercises").getOrCreate()
print(spark.version)

3.4.1


# https://sparkbyexamples.com/
**example pyspark code**


#https://www.machinelearningplus.com/pyspark/introduction-to-pyspark/
**Introduction to pyspark**

In [None]:
# withColumn = By using PySpark withColumn() on a DataFrame, we can cast or change the data type of a column.
# withColumn =  PySpark withColumn() function of DataFrame can also be used to change the value of an existing column.$
# withColumn = To add/create a new column, specify the first argument with a name you want your new column to be and use the second argument to assign a value by applying an operation on an existing column
# viwthColumn = In order to create a new column, pass the column name you wanted to the first argument of withColumn() transformation function.


# withColumnRenamed() = Though you cannot rename a column using withColumn, still I wanted to cover this as renaming is one of the common operations we perform on DataFrame. To rename an existing column use withColumnRenamed() function on DataFrame.
#  SparkSession.builder.appName('SparkByExamples.com').getOrCreate() = In PySpark, SparkSession.builder.appName() sets a name for your Spark application. This name is useful for identifying your application in Spark's web UI or logs

In [None]:
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()

+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|    1|
|    Bob|    2|
|Charlie|    3|
+-------+-----+



In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# Applying orderBy() and monotonically_increasing_id()
window_spec = Window.orderBy(monotonically_increasing_id())

# Add a new column "row_number" using row_number() over the specified window
result_df = df.withColumn("Index", row_number().over(window_spec)-1)

# Show the result
result_df.show()

+-------+-----+-----+
|   Name|Value|Index|
+-------+-----+-----+
|  Alice|    1|    0|
|    Bob|    2|    1|
|Charlie|    3|    2|
+-------+-----+-----+



In [None]:
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]

In [None]:
# Create an RDD from the lists and convert it to a DataFrame
rdd = spark.sparkContext.parallelize(list(zip(list1, list2)))
df = rdd.toDF(["Column1", "Column2"])

# Show the DataFrame
df.show()

+-------+-------+
|Column1|Column2|
+-------+-------+
|      a|      1|
|      b|      2|
|      c|      3|
|      d|      4|
+-------+-------+



In [None]:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]

In [None]:
sc = spark.sparkContext

rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

result_rdd = rdd_A.subtract(rdd_B)
# Collect result
result_list = result_rdd.collect()
print(result_list)

[1, 2, 3]


In [None]:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]

In [None]:
sc = spark.sparkContext

rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

result_rdd_A = rdd_A.subtract(rdd_B)
result_rdd_B = rdd_B.subtract(rdd_A)

result_rdd = result_rdd_A.union(result_rdd_B)
# Collect result
result_list = result_rdd.collect()
print(result_list)

[1, 2, 3, 8, 6, 7]


In [None]:
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+



In [None]:
quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)
print(quantiles)
print("Min: ", quantiles[0])
print("25th percentile: ", quantiles[1])
print("Median: ", quantiles[2])
print("75th percentile: ", quantiles[3])
print("Max: ", quantiles[4])

[10.0, 20.0, 30.0, 50.0, 86.0]
Min:  10.0
25th percentile:  20.0
Median:  30.0
75th percentile:  50.0
Max:  86.0


In [None]:
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()

In [None]:
df.select('name', 'job').distinct().collect()
####
df.groupby('job').count().show()
df.groupby('name').count().show()

+---------+-----+
|      job|count|
+---------+-----+
|Scientist|    2|
| Engineer|    4|
|   Doctor|    1|
+---------+-----+

+----+-----+
|name|count|
+----+-----+
|Mary|    1|
|John|    2|
| Bob|    3|
| Sam|    1|
+----+-----+



In [None]:
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+



In [None]:
from pyspark.sql.functions import col, when

# Get the top 2 most frequent jobs
top_2_jobs= df.groupby('job').count().orderBy('count', ascending=False).limit(2).select('job').rdd.flatMap(lambda x: x).collect()
# Replace all but the top 2 most frequent jobs with 'Other'
df = df.withColumn('job', when(col('job').isin(top_2_jobs), col('job')).otherwise('Other'))
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|    Other|
+----+---------+



In [None]:
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|null|
|   B| null| 123|
|   B|    3| 456|
|   D| null|null|
+----+-----+----+



In [None]:
df_1 = df.dropna(subset=['Value'], how='all')
df_1.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|null|
|   B|    3| 456|
+----+-----+----+



In [None]:
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])

# old column names
old_names = ["col1", "col2", "col3"]

# new column names
new_names = ["new_col1", "new_col2", "new_col3"]

df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



In [None]:
for old_name, new_name in zip(old_names, new_names):
  df = df.withColumnRenamed(old_name, new_name )
df.show()

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|       1|       2|       3|
|       4|       5|       6|
+--------+--------+--------+



In [None]:
from pyspark.sql.functions import rand
from pyspark.ml.feature import Bucketizer

# Create a DataFrame with a single column "values" filled with random numbers
num_items = 100
df = spark.range(num_items).select(rand(seed=42).alias("values"))

df.show(5)

+-------------------+
|             values|
+-------------------+
|  0.619189370225301|
| 0.5096018842446481|
| 0.8325259388871524|
|0.26322809041172357|
| 0.6702867696264135|
+-------------------+
only showing top 5 rows



In [None]:
num_buckets = 10  # Number of buckets
quantiles = df.stat.approxQuantile("values",[i/num_buckets for i in range(num_buckets+1)], 0.01)

# Create the Bucketizer
bucketizer = Bucketizer(splits=quantiles, inputCol="values", outputCol="buckets")

# Apply the Bucketizer
df_buck = bucketizer.transform(df)

#Frequency table
df_buck.groupBy("buckets").count().show()

# Show the original and bucketed values
df_buck.show(5)


+-------+-----+
|buckets|count|
+-------+-----+
|    8.0|   10|
|    0.0|    8|
|    7.0|   10|
|    1.0|   10|
|    4.0|   10|
|    3.0|   10|
|    2.0|   10|
|    6.0|   10|
|    5.0|   10|
|    9.0|   12|
+-------+-----+

+-------------------+-------+
|             values|buckets|
+-------------------+-------+
|  0.619189370225301|    4.0|
| 0.5096018842446481|    4.0|
| 0.8325259388871524|    8.0|
|0.26322809041172357|    2.0|
| 0.6702867696264135|    5.0|
+-------------------+-------+
only showing top 5 rows



In [None]:
# Example DataFrame
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
df = spark.createDataFrame(data, ["category1", "category2"])

df.show()


+---------+---------+
|category1|category2|
+---------+---------+
|        A|        X|
|        A|        Y|
|        A|        X|
|        B|        Y|
|        B|        X|
|        C|        X|
|        C|        X|
|        C|        Y|
+---------+---------+



In [None]:
df.cube("category1").count().show()
"""
The cube function in PySpark is used to perform data aggregation by generating all possible combinations of grouping columns, similar to how the groupby function works.
However, cube goes a step further by also including aggregations for all subsets of the specified grouping columns, as well as the grand total (aggregation across all data).
This means that if you apply cube on multiple columns, it will generate aggregations for each individual column, all possible pairs, all possible triplets, and so on, along with the overall total.
For example, in the code you provided, df.cube("category1").count().show(), the cube function is applied to the "category1" column.
This will generate counts for each distinct value in "category1" as well as a total count for all categories combined.
"""

+---------+-----+
|category1|count|
+---------+-----+
|        B|    2|
|     null|    8|
|        A|    3|
|        C|    3|
+---------+-----+



In [None]:
# Contingency table
df.crosstab('category1', 'category2').show()
"""
The crosstab function in PySpark is used to compute a contingency table (also known as a cross-tabulation) for two columns of a DataFrame.
 It essentially creates a frequency table that shows the distribution of values in one column, categorized by the values in another column.
In your code, df.crosstab('category1', 'category2').show(), it generates a table that shows how many times each combination of values from "category1" and "category2" appears in your DataFrame.
This allows you to see the relationship or association between those two categorical variables
"""

+-------------------+---+---+
|category1_category2|  X|  Y|
+-------------------+---+---+
|                  B|  1|  1|
|                  C|  2|  1|
|                  A|  2|  1|
+-------------------+---+---+



In [None]:
from pyspark.sql.functions import rand

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)
# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))
# Show the DataFrame
df.show()

+---+------+
| id|random|
+---+------+
|  0|     7|
|  1|     6|
|  2|     9|
|  3|     3|
|  4|     7|
|  5|     9|
|  6|     7|
|  7|     3|
|  8|     3|
|  9|     7|
+---+------+



In [None]:
from pyspark.sql.functions import col, when

df = df.withColumn("is_multiple_of_3", when(col("random")%3==0, 1).otherwise(0))
df.show()

+---+------+----------------+
| id|random|is_multiple_of_3|
+---+------+----------------+
|  0|     7|               0|
|  1|     6|               1|
|  2|     9|               1|
|  3|     3|               1|
|  4|     7|               0|
|  5|     9|               1|
|  6|     7|               0|
|  7|     3|               1|
|  8|     3|               1|
|  9|     7|               0|
+---+------+----------------+



In [None]:
from pyspark.sql.functions import rand

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

pos = [0, 4, 8, 5]

+---+------+
| id|random|
+---+------+
|  0|     7|
|  1|     6|
|  2|     9|
|  3|     3|
|  4|     7|
|  5|     9|
|  6|     7|
|  7|     3|
|  8|     3|
|  9|     7|
+---+------+



In [None]:
pos = [0, 4, 8, 5]

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("index", row_number().over(w) - 1)

df.show()

# Filter the DataFrame based on the specified positions
df_filtered = df.filter(df.index.isin(pos))

df_filtered.show()

+---+------+-----+
| id|random|index|
+---+------+-----+
|  0|     7|    0|
|  1|     6|    1|
|  2|     9|    2|
|  3|     3|    3|
|  4|     7|    4|
|  5|     9|    5|
|  6|     7|    6|
|  7|     3|    7|
|  8|     3|    8|
|  9|     7|    9|
+---+------+-----+

+---+------+-----+
| id|random|index|
+---+------+-----+
|  0|     7|    0|
|  4|     7|    4|
|  5|     9|    5|
|  8|     3|    8|
+---+------+-----+



In [None]:
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_3|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



In [None]:
df= df_A.union(df_B)
df.show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



In [None]:
# Assume you have a DataFrame df with two columns "actual" and "predicted"
# For the sake of example, we'll create a sample DataFrame
data = [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
df = spark.createDataFrame(data, ["actual", "predicted"])

df.show()

+------+---------+
|actual|predicted|
+------+---------+
|     1|        1|
|     2|        4|
|     3|        9|
|     4|       16|
|     5|       25|
+------+---------+



In [None]:
from pyspark.sql.functions import col, when
df= df.withColumn("squarred_error", pow(col("actual")-col("predicted"), 2))
mse = df.agg({"squarred_error": "avg"}).collect()[0][0]
df.show()
print(f"Mean Squared Error (MSE) = {mse}")

+------+---------+--------------+
|actual|predicted|squarred_error|
+------+---------+--------------+
|     1|        1|           0.0|
|     2|        4|           4.0|
|     3|        9|          36.0|
|     4|       16|         144.0|
|     5|       25|         400.0|
+------+---------+--------------+

Mean Squared Error (MSE) = 116.8


# **17. How to convert the first character of each element in a series to uppercase?**

In [None]:
# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+



In [None]:
from pyspark.sql.functions import *
df.withColumn("name", initcap(col("name"))).show()

+-----+
| name|
+-----+
| John|
|Alice|
|  Bob|
+-----+



# **18. How to compute summary statistics for all columns in a dataframe**

In [None]:
# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
|  Maria| 29| 80000|
|    Jen| 32| 65000|
+-------+---+------+



In [None]:
summary = df.summary()
summary.show()

+-------+------+-----------------+-----------------+
|summary|  name|              age|           salary|
+-------+------+-----------------+-----------------+
|  count|     5|                5|                5|
|   mean|  null|             32.4|          66000.0|
| stddev|  null|3.209361307176242|9617.692030835671|
|    min| James|               29|            55000|
|    25%|  null|               30|            60000|
|    50%|  null|               32|            65000|
|    75%|  null|               34|            70000|
|    max|Robert|               37|            80000|
+-------+------+-----------------+-----------------+



# **19. How to calculate the number of characters in each word in a column?**

In [None]:
# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+



In [None]:
import pyspark.sql.functions as F
df.select(F.length("name")).show()
df.withColumn("length", F.length("name")).show()

+------------+
|length(name)|
+------------+
|           4|
|           5|
|           3|
+------------+

+-----+------+
| name|length|
+-----+------+
| john|     4|
|alice|     5|
|  bob|     3|
+-----+------+



# **20 How to compute difference of differences between consecutive numbers of a column?**

In [None]:
# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
|  Maria| 29| 80000|
|    Jen| 32| 65000|
+-------+---+------+



In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = df.withColumn("id", F.monotonically_increasing_id())
window = Window.orderBy("id")
df = df.withColumn("data", F.lag(F.col("salary"), 1, 0).over(window))
df = df.withColumn("data", F.col("salary") - F.lag(F.col("salary"), 1, 0).over(window))
df= df.drop("id")
df.show()


+-------+---+------+------+----------+------+
|   name|age|salary|  data|prev_value|  diff|
+-------+---+------+------+----------+------+
|  James| 34| 55000| 55000|      null|     0|
|Michael| 30| 70000| 15000|     55000| 15000|
| Robert| 37| 60000|-10000|     70000|-10000|
|  Maria| 29| 80000| 20000|     60000| 20000|
|    Jen| 32| 65000|-15000|     80000|-15000|
+-------+---+------+------+----------+------+



In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define window specification
df = df.withColumn("id", F.monotonically_increasing_id())
window = Window.orderBy("id")

# Generate the lag of the variable
df = df.withColumn("prev_value", F.lag(df.salary).over(window))

# Compute the difference with lag
df = df.withColumn("diff", F.when(F.isnull(df.salary - df.prev_value), 0)
.otherwise(df.salary - df.prev_value)).drop("id")

df.show()

+-------+---+------+------+----------+------+
|   name|age|salary|  data|prev_value|  diff|
+-------+---+------+------+----------+------+
|  James| 34| 55000| 55000|      null|     0|
|Michael| 30| 70000| 15000|     55000| 15000|
| Robert| 37| 60000|-10000|     70000|-10000|
|  Maria| 29| 80000| 20000|     60000| 20000|
|    Jen| 32| 65000|-15000|     80000|-15000|
+-------+---+------+------+----------+------+



# **21. How to get the day of month, week number, day of year and day of week from a date strings?**

In [None]:
# example data
data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])

df.show()

+----------+-----------+
|date_str_1| date_str_2|
+----------+-----------+
|2023-05-18|01 Jan 2010|
|2023-12-31|01 Jan 2010|
+----------+-----------+



In [None]:
from pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, dayofweek

# Assuming df is your DataFrame with the date columns
df_updated = df.withColumn("dayofmonth_1", dayofmonth(to_date(df.date_str_1))) \
               .withColumn("weekofyear_1", weekofyear(to_date(df.date_str_1))) \
               .withColumn("dayofyear_1", dayofyear(to_date(df.date_str_1))) \
               .withColumn("dayofweek_1", dayofweek(to_date(df.date_str_1))) \
               .withColumn("dayofmonth_2", dayofmonth(to_date(df.date_str_2, 'dd MMM yyyy'))) \
               .withColumn("weekofyear_2", weekofyear(to_date(df.date_str_2, 'dd MMM yyyy'))) \
               .withColumn("dayofyear_2", dayofyear(to_date(df.date_str_2, 'dd MMM yyyy'))) \
               .withColumn("dayofweek_2", dayofweek(to_date(df.date_str_2, 'dd MMM yyyy')))
df_updated.show()


+----------+-----------+------------+------------+-----------+-----------+------------+------------+-----------+-----------+
|date_str_1| date_str_2|dayofmonth_1|weekofyear_1|dayofyear_1|dayofweek_1|dayofmonth_2|weekofyear_2|dayofyear_2|dayofweek_2|
+----------+-----------+------------+------------+-----------+-----------+------------+------------+-----------+-----------+
|2023-05-18|01 Jan 2010|          18|          20|        138|          5|           1|          53|          1|          6|
|2023-12-31|01 Jan 2010|          31|          52|        365|          1|           1|          53|          1|          6|
+----------+-----------+------------+------------+-----------+-----------+------------+------------+-----------+-----------+



# **22. How to convert year-month string to dates corresponding to the 4th day of the month?**

In [None]:
# example dataframe
df = spark.createDataFrame([('Jan 2010',), ('Feb 2011',), ('Mar 2012',)], ['MonthYear'])

df.show()

+---------+
|MonthYear|
+---------+
| Jan 2010|
| Feb 2011|
| Mar 2012|
+---------+



In [None]:
from pyspark.sql.functions import expr, col

# convert YearMonth to date (default to first day of the month)
df = df.withColumn('Date', expr("to_date(MonthYear, 'MMM yyyy')"))

df.show()

df = df.withColumn('Date', expr("date_add(date_sub(Date, day(Date) - 1), 3)"))

df.show()


+---------+----------+
|MonthYear|      Date|
+---------+----------+
| Jan 2010|2010-01-01|
| Feb 2011|2011-02-01|
| Mar 2012|2012-03-01|
+---------+----------+

+---------+----------+
|MonthYear|      Date|
+---------+----------+
| Jan 2010|2010-01-04|
| Feb 2011|2011-02-04|
| Mar 2012|2012-03-04|
+---------+----------+



# **23 How to filter words that contain atleast 2 vowels from a series?**

In [None]:
df = spark.createDataFrame([('Apple',), ('Orange',), ('Plan',) , ('Python',) , ('Money',)], ['Word'])

df.show()

+------+
|  Word|
+------+
| Apple|
|Orange|
|  Plan|
|Python|
| Money|
+------+



In [None]:
from pyspark.sql.functions import col, length, translate

# Filter words that contain at least 2 vowels
df_filtered = df.where((length(col('Word')) - length(translate(col('Word'), 'AEIOUaeiou', ''))) >= 2)
df_filtered.show()

+------+
|  Word|
+------+
| Apple|
|Orange|
| Money|
+------+



# **24. How to filter valid emails from a list?**

In [None]:
# Create a list
data = ['buying books at amazom.com', 'rameses@egypt.com', 'matt@t.co', 'narendra@modi.com']

# Convert the list to DataFrame
df = spark.createDataFrame(data, "string")
df.show(truncate =False)

+--------------------------+
|value                     |
+--------------------------+
|buying books at amazom.com|
|rameses@egypt.com         |
|matt@t.co                 |
|narendra@modi.com         |
+--------------------------+



In [None]:
from pyspark.sql import functions as F

pattern ="^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
df_filtered = df.filter(F.col('value').rlike(pattern))
df_filtered.show()

+-----------------+
|            value|
+-----------------+
|rameses@egypt.com|
|        matt@t.co|
|narendra@modi.com|
+-----------------+



# **25. How to Pivot PySpark DataFrame?**


In [None]:
# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
df.show()

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021|      1|    US|   5000|
|2021|      1|    EU|   4000|
|2021|      2|    US|   5500|
|2021|      2|    EU|   4500|
|2021|      3|    US|   6000|
|2021|      3|    EU|   5000|
|2021|      4|    US|   7000|
|2021|      4|    EU|   6000|
+----+-------+------+-------+



In [None]:
from pyspark.sql import functions as F
pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")
pivot_df.show()

+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      2|4500|5500|
|2021|      1|4000|5000|
|2021|      4|6000|7000|
|2021|      3|5000|6000|
+----+-------+----+----+



# **26. How to get the mean of a variable grouped by another variable?**

In [None]:
# Sample data
data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]

# Create DataFrame
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)

df.show()

+-------+----------+-----+
|OrderID|   Product|Price|
+-------+----------+-----+
|   1001|    Laptop| 1000|
|   1002|     Mouse|   50|
|   1003|    Laptop| 1200|
|   1004|     Mouse|   30|
|   1005|Smartphone|  700|
+-------+----------+-----+



In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import mean

df1 = df.groupBy("Product").agg(F.avg("Price"))
df1.show()

result = df.groupBy("Product").agg(mean("Price").alias("Total_Sales"))
# Show results
result.show()

+----------+----------+
|   Product|avg(Price)|
+----------+----------+
|    Laptop|    1100.0|
|     Mouse|      40.0|
|Smartphone|     700.0|
+----------+----------+

+----------+-----------+
|   Product|Total_Sales|
+----------+-----------+
|    Laptop|     1100.0|
|     Mouse|       40.0|
|Smartphone|      700.0|
+----------+-----------+



#**27. How to compute the euclidean distance between two columns?**


In [None]:
# Define your series
data = [(1, 10), (2, 9), (3, 8), (4, 7), (5, 6), (6, 5), (7, 4), (8, 3), (9, 2), (10, 1)]

# Convert list to DataFrame
df = spark.createDataFrame(data, ["series1", "series2"])

df.show()

+-------+-------+
|series1|series2|
+-------+-------+
|      1|     10|
|      2|      9|
|      3|      8|
|      4|      7|
|      5|      6|
|      6|      5|
|      7|      4|
|      8|      3|
|      9|      2|
|     10|      1|
+-------+-------+



In [None]:
from pyspark.sql.functions import expr
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Convert series to vectors
vecAssembler = VectorAssembler(inputCols=["series1", "series2"], outputCol="vectors")
df = vecAssembler.transform(df)

# Calculate squared differences
df = df.withColumn("squared_diff", expr("POW(series1 - series2, 2)"))

# Sum squared differences and take square root
df.agg(expr("SQRT(SUM(squared_diff))").alias("euclidean_distance")).show()

+------------------+
|euclidean_distance|
+------------------+
| 18.16590212458495|
+------------------+



#**28. How to replace missing spaces in a string with the least frequent character?**


In [None]:
df = spark.createDataFrame([('dbc deb abed gade',),], ["string"])
df.show()

+-----------------+
|           string|
+-----------------+
|dbc deb abed gade|
+-----------------+



In [None]:
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import StringType, ArrayType
from collections import Counter

def least_freq_char_replace_spaces(s):
  counter = Counter(s.replace(" ", ""))
  print(counter)
  least_freq_char = min(counter, key = counter.get)
  print(least_freq_char)

  return s.replace(' ', least_freq_char)

udf_least_freq_char_replace_spaces = udf(least_freq_char_replace_spaces, StringType())

df = spark.createDataFrame([('dbc deb abed gade',)], ["string"])
df.withColumn('modified_string', udf_least_freq_char_replace_spaces(df['string'])).show()

+-----------------+-----------------+
|           string|  modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+



#**29. How to create a TimeSeries starting ‘2000-01-01’ and 10 weekends (saturdays) after that having random numbers as values?**


In [None]:
from pyspark.sql.functions import expr, explode, sequence, rand

# Start date and end date (start + 10 weekends)
start_date = '2000-01-01'
end_date = '2000-03-04' # Calculated manually: 10 weekends (Saturdays) from start date

# Create a DataFrame with one row containing a sequence from start_date to end_date with a 1 day step
df = spark.range(1).select(
explode(
sequence(
expr(f"date '{start_date}'"),
expr(f"date '{end_date}'"),
expr("interval 1 day")
)
).alias("date")
)

# Filter out the weekdays (retain weekends)
df = df.filter(expr("dayofweek(date) = 7")) # 7 corresponds to Saturday in Spark

# Add the random numbers column
#df = df.withColumn("random_numbers", rand()*10)
df = df.withColumn("random_numbers", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

+----------+--------------+
|      date|random_numbers|
+----------+--------------+
|2000-01-01|             9|
|2000-01-08|             7|
|2000-01-15|             3|
|2000-01-22|             3|
|2000-01-29|             7|
|2000-02-05|             9|
|2000-02-12|             9|
|2000-02-19|             8|
|2000-02-26|             4|
|2000-03-04|             1|
+----------+--------------+



#**30. How to get the nrows, ncolumns, datatype of a dataframe?**

In [None]:
from pyspark import SparkFiles

url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling.csv"

spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"), header=True, inferSchema=True)

#df = spark.read.csv("C:/Users/RajeshVaddi/Documents/MLPlus/DataSets/Churn_Modelling.csv", header=True, inferSchema=True)

df.show(5, truncate=False)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance  |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|1        |15634602  |Hargrave|619        |France   |Female|42 |2     |0.0      |1            |1        |1             |101348.88      |1     |
|2        |15647311  |Hill    |608        |Spain    |Female|41 |1     |83807.86 |1            |0        |1             |112542.58      |0     |
|3        |15619304  |Onio    |502        |France   |Female|42 |8     |159660.8 |3            |1        |0             |113931.57      |1     |
|4        |15701354  |Boni    |699        |France   |Female|39 |1     |0.0      |2            |0        |0             |93826.63       |

In [None]:
nrows = df.count()
print(f"Number of rows: {nrows}")

ncolumns = len(df.columns)
print(f"Number of columns: {ncolumns}")

datatype = df.dtypes
print(f"Data types: {datatype}")

Number of rows: 10000
Number of columns: 14
Data types: [('RowNumber', 'int'), ('CustomerId', 'int'), ('Surname', 'string'), ('CreditScore', 'int'), ('Geography', 'string'), ('Gender', 'string'), ('Age', 'int'), ('Tenure', 'int'), ('Balance', 'double'), ('NumOfProducts', 'int'), ('HasCrCard', 'int'), ('IsActiveMember', 'int'), ('EstimatedSalary', 'double'), ('Exited', 'int')]


#**31. How to rename a specific columns in a dataframe?**



In [None]:
# Suppose you have the following DataFrame
df = spark.createDataFrame([('Alice', 1, 30),('Bob', 2, 35)], ["name", "age", "qty"])

df.show()

# Rename lists for specific columns
old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]


+-----+---+---+
| name|age|qty|
+-----+---+---+
|Alice|  1| 30|
|  Bob|  2| 35|
+-----+---+---+



In [None]:
#df =df.withColumnRenamed("qty", "quantity")
for old_names,new_names in zip(old_names, new_names):
  df = df.withColumnRenamed(old_names, new_names)
df.show()


+-----+--------+--------+
| name|user_age|quantity|
+-----+--------+--------+
|Alice|       1|      30|
|  Bob|       2|      35|
+-----+--------+--------+



# **32. How to check if a dataframe has any missing values and count of missing values in each column?**

In [None]:
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|null|
|   B| null| 123|
|   B|    3| 456|
|   D| null|null|
+----+-----+----+



In [None]:
from pyspark.sql.functions import col, sum

missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
has_missing = any(row.asDict().values() for row in missing.collect())
print(has_missing)

missing_count = missing.collect()[0].asDict()
print(missing_count)

True
{'Name': 0, 'Value': 2, 'id': 2}


# **33 How to replace missing values of multiple numeric columns with the mean?**

In [None]:
df = spark.createDataFrame([
("A", 1, None),
("B", None, 123 ),
("B", 3, 456),
("D", 6, None),
], ["Name", "var1", "var2"])

df.show()

+----+----+----+
|Name|var1|var2|
+----+----+----+
|   A|   1|null|
|   B|null| 123|
|   B|   3| 456|
|   D|   6|null|
+----+----+----+



In [None]:
from pyspark.ml.feature import Imputer

column_names = ["var1", "var2"]

# Initialize the Imputer
imputer = Imputer(inputCols= column_names, outputCols= column_names, strategy="mean")

# Fit the Imputer
model = imputer.fit(df)

#Transform the dataset
imputed_df = model.transform(df)

imputed_df.show(5)

"""
Imputer
class pyspark.ml.feature.Imputer(*, strategy: str = 'mean', missingValue: float = nan, inputCols: Optional[List[str]] = None, outputCols: Optional[List[str]] = None, inputCol: Optional[str] = None, outputCol: Optional[str] = None, relativeError: float = 0.001)[source]
Imputation estimator for completing missing values, using the mean, median or mode of the columns in which the missing values are located. The input columns should be of numeric type. Currently Imputer does not support categorical features and possibly creates incorrect values for a categorical feature.

Note that the mean/median/mode value is computed after filtering out missing values. All Null values in the input columns are treated as missing, and so are also imputed. For computing median,

"""

+----+----+----+
|Name|var1|var2|
+----+----+----+
|   A|   1| 289|
|   B|   3| 123|
|   B|   3| 456|
|   D|   6| 289|
+----+----+----+



# **34. How to change the order of columns of a dataframe?**

In [None]:
# Sample data
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Alice", "Smith", 22)]

# Create DataFrame from the data
df = spark.createDataFrame(data, ["First_Name", "Last_Name", "Age"])

# Show the DataFrame
df.show()

+----------+---------+---+
|First_Name|Last_Name|Age|
+----------+---------+---+
|      John|      Doe| 30|
|      Jane|      Doe| 25|
|     Alice|    Smith| 22|
+----------+---------+---+



In [None]:
new_order = ["Age", "First_Name", "Last_Name"]
df = df.select(*new_order)
df.show()

+---+----------+---------+
|Age|First_Name|Last_Name|
+---+----------+---------+
| 30|      John|      Doe|
| 25|      Jane|      Doe|
| 22|     Alice|    Smith|
+---+----------+---------+



# **35. How to format or suppress scientific notations in a PySpark DataFrame?**

In [None]:
# Assuming you have a DataFrame df and the column you want to format is 'your_column'
df = spark.createDataFrame([(1, 0.000000123), (2, 0.000023456), (3, 0.000345678)], ["id", "your_column"])

df.show()

+---+-----------+
| id|your_column|
+---+-----------+
|  1|    1.23E-7|
|  2|  2.3456E-5|
|  3| 3.45678E-4|
+---+-----------+



In [None]:
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import format_number

decimal_places = 10

df = df.withColumn("your_column", format_number("your_column", decimal_places))
df.show()

+---+------------+
| id| your_column|
+---+------------+
|  1|0.0000000000|
|  2|0.0000000000|
|  3|0.0000000000|
+---+------------+



# **36. How to format all the values in a dataframe as percentages?**



In [None]:
# Sample data
data = [(0.1, .08), (0.2, .06), (0.33, .02)]
df = spark.createDataFrame(data, ["numbers_1", "numbers_2"])

df.show()

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
|      0.1|     0.08|
|      0.2|     0.06|
|     0.33|     0.02|
+---------+---------+



In [None]:
from pyspark.sql.functions import concat, col, lit

#create an empty list to add columns name
columns = []

#Loop to add columns name from the dataframes into a list
for field in df.schema.fields:
    columns.append(field.name)

#print(columns) --> testin result : is ok

#function for percentage into column dataframe pyspark
for col_name in columns:
  df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))

df.show()



+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
|   10.00%|    8.00%|
|   20.00%|    6.00%|
|   33.00%|    2.00%|
+---------+---------+



In [None]:
from pyspark.sql.functions import concat, col, lit

columns = ["numbers_1", "numbers_2"]

for col_name in columns:
  df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))

df.show()

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
|   10.00%|    8.00%|
|   20.00%|    6.00%|
|   33.00%|    2.00%|
+---------+---------+



# **37. How to filter every nth row in a dataframe?**


In [None]:
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Dave", 4), ("Eve", 5),
("Frank", 6), ("Grace", 7), ("Hannah", 8), ("Igor", 9), ("Jack", 10)]

# Create DataFrame
df = spark.createDataFrame(data, ["Name", "Number"])

df.show()

+-------+------+
|   Name|Number|
+-------+------+
|  Alice|     1|
|    Bob|     2|
|Charlie|     3|
|   Dave|     4|
|    Eve|     5|
|  Frank|     6|
|  Grace|     7|
| Hannah|     8|
|   Igor|     9|
|   Jack|    10|
+-------+------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id, row_number

# Define window
window = Window.orderBy(monotonically_increasing_id())

# Add row_number to DataFrame
df = df.withColumn("rn", row_number().over(window))

n = 5 # filter every 5th row

# Filter every nth row
df = df.filter((df.rn % n) == 0)
df.show()


+----+------+---+
|Name|Number| rn|
+----+------+---+
| Eve|     5|  5|
|Jack|    10| 10|
+----+------+---+



# **38. How to get the row number of the nth largest value in a column?**

In [None]:
from pyspark.sql import Row

# Sample Data
data = [
Row(id=1, column1=5),
Row(id=2, column1=8),
Row(id=3, column1=12),
Row(id=4, column1=1),
Row(id=5, column1=15),
Row(id=6, column1=7),
]

df = spark.createDataFrame(data)
df.show()

+---+-------+
| id|column1|
+---+-------+
|  1|      5|
|  2|      8|
|  3|     12|
|  4|      1|
|  5|     15|
|  6|      7|
+---+-------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number

window = Window.orderBy(desc("column1"))
df = df.withColumn("row_number", row_number().over(window))

n = 3 # We're interested in the 3rd largest value.
row = df.filter(df.row_number == n).first()

if row:
  print("Row number:", row.row_number)
  print("Column value:", row.column1)

Row number: 3
Column value: 8


# **39.How to get the last n rows of a dataframe with row sum > 100?**

In [None]:
# Sample data
data = [(10, 25, 70),
(40, 5, 20),
(70, 80, 100),
(10, 2, 60),
(40, 50, 20)]

# Create DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Display original DataFrame
df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|  10|  25|  70|
|  40|   5|  20|
|  70|  80| 100|
|  10|   2|  60|
|  40|  50|  20|
+----+----+----+



In [None]:
from pyspark.sql import functions as F
from functools import reduce

#In Python, the reduce() function is used to apply a function to an iterable and reduce it to a single cumulative value.
#This function is part of the functools module, which must be imported before you can use reduce()


# Add 'row_sum' column
df = df.withColumn('row_sum', reduce(lambda a, b: a+b, [F.col(x) for x in df.columns]))

# Display DataFrame with 'row_sum'
df.show()

# Filter rows where 'row_sum' > 100
df = df.filter(F.col('row_sum') > 100)

# Display filtered DataFrame
df.show()

# Add 'id' column
df = df.withColumn('id', F.monotonically_increasing_id())

# Get the last 2 rows
df_last_2 = df.sort(F.desc('id')).limit(2)

# Display the last 2 rows
df_last_2.show()

+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
|  10|  25|  70|    105|
|  40|   5|  20|     65|
|  70|  80| 100|    250|
|  10|   2|  60|     72|
|  40|  50|  20|    110|
+----+----+----+-------+

+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
|  10|  25|  70|    105|
|  70|  80| 100|    250|
|  40|  50|  20|    110|
+----+----+----+-------+

+----+----+----+-------+----------+
|col1|col2|col3|row_sum|        id|
+----+----+----+-------+----------+
|  40|  50|  20|    110|8589934593|
|  70|  80| 100|    250|8589934592|
+----+----+----+-------+----------+



# **40. How to create a column containing the minimum by maximum of each row?**

In [None]:
# Sample Data
data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]

# Create DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

df.show()


+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
|   7|   8|   9|
|  10|  11|  12|
+----+----+----+



In [None]:
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType

# Define UDF
def min_max_ratio(row):
  return float(min(row)) / max(row)

min_max_ratio_udf = udf(min_max_ratio, FloatType())

# Apply UDF to create new column
df = df.withColumn('min_by_max', min_max_ratio_udf(array(df.columns)))

df.show()


+----+----+----+----------+
|col1|col2|col3|min_by_max|
+----+----+----+----------+
|   1|   2|   3|0.33333334|
|   4|   5|   6| 0.6666667|
|   7|   8|   9| 0.7777778|
|  10|  11|  12| 0.8333333|
+----+----+----+----------+



# **41. How to create a column that contains the penultimate value in each row?**

In [None]:
data = [(10, 20, 30),
(40, 60, 50),
(80, 70, 90)]

df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

df.show()

#The penultimate value refers to the second-to-last value in a sequence or list. For example,
#if you have a list of numbers [10, 20, 30, 40, 50], the penultimate value would be 40. It’s essentially the value right before the last one.


+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
|     10|     20|     30|
|     40|     60|     50|
|     80|     70|     90|
+-------+-------+-------+



In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType

# Define UDF to sort array in descending order
sort_array_desc = F.udf(lambda arr: sorted(arr), ArrayType(IntegerType()))

# Create array from columns, sort in descending order and get the penultimate value
df = df.withColumn("row_as_array", sort_array_desc(F.array(df.columns)))
df = df.withColumn("Penultimate", df['row_as_array'].getItem(1))
df = df.drop('row_as_array')

df.show()

"""
UDF’s a.k.a User Defined Functions, If you are coming from SQL background, UDF’s are nothing new to you as most of the traditional RDBMS databases support User Defined Functions,
these functions need to register in the database library and use them on SQL as regular functions

PySpark UDF’s are similar to UDF on traditional databases. In PySpark,
you create a function in a Python syntax and wrap it with PySpark SQL udf() or register it as udf and use it on DataFrame and SQL respectively.
"""

+-------+-------+-------+-----------+
|Column1|Column2|Column3|Penultimate|
+-------+-------+-------+-----------+
|     10|     20|     30|         20|
|     40|     60|     50|         50|
|     80|     70|     90|         80|
+-------+-------+-------+-----------+



# **42. How to normalize all columns in a dataframe?**

In [None]:
# create a sample dataframe
data = [(1, 2, 3),
(2, 3, 4),
(3, 4, 5),
(4, 5, 6)]

df = spark.createDataFrame(data, ["Col1", "Col2", "Col3"])

df.show()

+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
|   1|   2|   3|
|   2|   3|   4|
|   3|   4|   5|
|   4|   5|   6|
+----+----+----+



In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col

#define the list of columns to be normalized
columns_to_normalize = ["Col1", "Col2", "Col3"]

#create a vector assembler to combine the columns into a single vector column
assembler = VectorAssembler(inputCols=columns_to_normalize, outputCol="features")
# transform the data
df_assembled  = assembler.transform(df)

#create a standard scaler to normalize the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

# fit and transform the data
scalerModel = scaler.fit(df_assembled)
df_normalized = scalerModel.transform(df_assembled)

# if you want to drop the original 'features' column
df_normalized = df_normalized.drop('features')

df_normalized.show(truncate=False)

+----+----+----+-------------------------------------------------------------+
|Col1|Col2|Col3|scaled_features                                              |
+----+----+----+-------------------------------------------------------------+
|1   |2   |3   |[-1.161895003862225,-1.161895003862225,-1.161895003862225]   |
|2   |3   |4   |[-0.3872983346207417,-0.3872983346207417,-0.3872983346207417]|
|3   |4   |5   |[0.3872983346207417,0.3872983346207417,0.3872983346207417]   |
|4   |5   |6   |[1.161895003862225,1.161895003862225,1.161895003862225]      |
+----+----+----+-------------------------------------------------------------+



# **43. How to get the positions where values of two columns match?**

In [None]:
# Create sample DataFrame
data = [("John", "John"), ("Lily", "Lucy"), ("Sam", "Sam"), ("Lucy", "Lily")]
df = spark.createDataFrame(data, ["Name1", "Name2"])
df.show()

+-----+-----+
|Name1|Name2|
+-----+-----+
| John| John|
| Lily| Lucy|
|  Sam|  Sam|
| Lucy| Lily|
+-----+-----+



In [None]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col

df = df.withColumn("position", when(col("Name1") == col("Name2"), 1).otherwise(0))
df.show()

+-----+-----+--------+
|Name1|Name2|position|
+-----+-----+--------+
| John| John|       1|
| Lily| Lucy|       0|
|  Sam|  Sam|       1|
| Lucy| Lily|       0|
+-----+-----+--------+



# **44. How to create lags and leads of a column by group in a dataframe?**

In [None]:
# Create a sample DataFrame
data = [("2023-01-01", "Store1", 100),
("2023-01-02", "Store1", 150),
("2023-01-03", "Store1", 200),
("2023-01-04", "Store1", 250),
("2023-01-05", "Store1", 300),
("2023-01-01", "Store2", 50),
("2023-01-02", "Store2", 60),
("2023-01-03", "Store2", 80),
("2023-01-04", "Store2", 90),
("2023-01-05", "Store2", 120)]

df = spark.createDataFrame(data, ["Date", "Store", "Sales"])

df.show()

+----------+------+-----+
|      Date| Store|Sales|
+----------+------+-----+
|2023-01-01|Store1|  100|
|2023-01-02|Store1|  150|
|2023-01-03|Store1|  200|
|2023-01-04|Store1|  250|
|2023-01-05|Store1|  300|
|2023-01-01|Store2|   50|
|2023-01-02|Store2|   60|
|2023-01-03|Store2|   80|
|2023-01-04|Store2|   90|
|2023-01-05|Store2|  120|
+----------+------+-----+



In [None]:
from pyspark.sql.functions import lag, lead, to_date
from pyspark.sql.window import Window

# Convert the date from string to date type
df = df.withColumn("Date", to_date(df.Date, 'yyyy-MM-dd'))

# Create a Window partitioned by Store, ordered by Date
windowSpec = Window.partitionBy("Store").orderBy("Date")

# Create lag and lead variables
df = df.withColumn("Lag_Sales", lag(df["Sales"]).over(windowSpec))
df = df.withColumn("Lead_Sales", lead(df["Sales"]).over(windowSpec))

df.show()

+----------+------+-----+---------+----------+
|      Date| Store|Sales|Lag_Sales|Lead_Sales|
+----------+------+-----+---------+----------+
|2023-01-01|Store1|  100|     null|       150|
|2023-01-02|Store1|  150|      100|       200|
|2023-01-03|Store1|  200|      150|       250|
|2023-01-04|Store1|  250|      200|       300|
|2023-01-05|Store1|  300|      250|      null|
|2023-01-01|Store2|   50|     null|        60|
|2023-01-02|Store2|   60|       50|        80|
|2023-01-03|Store2|   80|       60|        90|
|2023-01-04|Store2|   90|       80|       120|
|2023-01-05|Store2|  120|       90|      null|
+----------+------+-----+---------+----------+



#**45. How to get the frequency of unique values in the entire dataframe?**

In [None]:
# Create a numeric DataFrame
data = [(1, 2, 3),
(2, 3, 4),
(1, 2, 3),
(4, 5, 6),
(2, 3, 4)]
df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

# Print DataFrame
df.show()

+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
|      1|      2|      3|
|      2|      3|      4|
|      1|      2|      3|
|      4|      5|      6|
|      2|      3|      4|
+-------+-------+-------+



In [None]:
from pyspark.sql.functions import col

# get column names
columns = df.columns

# stack all columns into a single column
df_single = None

for c in columns:
  if df_single is None:
    df_single = df.select(col(c).alias("single_column"))
  else:
    df_single = df_single.union(df.select(col(c).alias("single_column")))

# generate frequency table
frequency_table = df_single.groupBy("single_column").count().orderBy('count', ascending=False)

# show frequency table
frequency_table.show()

+-------------+-----+
|single_column|count|
+-------------+-----+
|            2|    4|
|            3|    4|
|            4|    3|
|            1|    2|
|            5|    1|
|            6|    1|
+-------------+-----+



# **46. How to replace both the diagonals of dataframe with 0?**

In [None]:
# Create a numeric DataFrame
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(1, 2, 3, 4),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])

# Print DataFrame
df.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    1|    2|    3|    4|
|    2|    3|    4|    5|
|    1|    2|    3|    4|
|    4|    5|    6|    7|
+-----+-----+-----+-----+



# **47. How to reverse the rows of a dataframe?**

In [None]:
# Create a numeric DataFrame
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(3, 4, 5, 6),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])

# Print DataFrame
df.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    1|    2|    3|    4|
|    2|    3|    4|    5|
|    3|    4|    5|    6|
|    4|    5|    6|    7|
+-----+-----+-----+-----+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
w = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("row_number", row_number().over(w))
df = df.orderBy(df["row_number"].desc())
df = df.drop("row_number")
df.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    4|    5|    6|    7|
|    3|    4|    5|    6|
|    2|    3|    4|    5|
|    1|    2|    3|    4|
+-----+-----+-----+-----+



# **48. How to create one-hot encodings of a categorical variable (dummy variables)?**

In [None]:
data = [("A", 10),("A", 20),("B", 30),("B", 20),("B", 30),("C", 40),("C", 10),("D", 10)]
columns = ["Categories", "Value"]

df = spark.createDataFrame(data, columns)
df.show()

+----------+-----+
|Categories|Value|
+----------+-----+
|         A|   10|
|         A|   20|
|         B|   30|
|         B|   20|
|         B|   30|
|         C|   40|
|         C|   10|
|         D|   10|
+----------+-----+



The OneHotEncoder class in PySpark's ml.feature module **is used to convert categorical data (like "apple," "banana," "cherry") into a numerical format that machine learning algorithms can understand**. Specifically,** it transforms each category into a unique binary vector, where:**

**Each category is represented by a vector (array) containing mostly zeros, with a single "1" in the position corresponding to that category**.
For example*, if there are 5 categories, a value of 2.0 could be encoded as [0.0, 0.0, 1.0, 0.0].*
Key points:

dropLast (default=True): This setting means that the last category is not included in the encoding. This is done to avoid redundancy because all categories add up to one vector. For instance, if there are 4 categories and the last one is not included, an input value of 3.0 might be represented as [0.0, 0.0, 0.0] instead of [0.0, 0.0, 0.0, 1.0].

handleInvalid ('error' or 'keep'): This parameter controls what happens when the encoder encounters an invalid category (one that wasn't seen during training):

'error': Throws an error for invalid categories.
'keep': Adds an extra category for invalid values, which, if dropLast is true, will also result in an all-zero vector.
Use Case:
This encoder is useful in preparing categorical features for algorithms that require numerical input, ensuring that each category is uniquely represented without implying any ordinal relationship between categories.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
#from pyspark.sql.types import StringType, StructType, StructField

# StringIndexer Initialization
indexer = StringIndexer(inputCol="Categories", outputCol="Categories_Indexed")
indexerModel = indexer.fit(df)

# Transform the DataFrame using the fitted StringIndexer model
indexed_df = indexerModel.transform(df)
#indexed_df.show()

encoder = OneHotEncoder(inputCol="Categories_Indexed", outputCol="Categories_onehot")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
encoded_df = encoded_df.drop("Categories_Indexed")
encoded_df.show(truncate=False)

# **49. How to Pivot the dataframe (converting rows into columns) ?**

In [None]:
# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)

In [None]:
df.groupBy("year","quarter")\
  .pivot("region")\
  .agg(F.sum("revenue"))\
  .show()




+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      2|4500|5500|
|2021|      1|4000|5000|
|2021|      4|6000|7000|
|2021|      3|5000|6000|
+----+-------+----+----+



# **50. How to UnPivot the dataframe (converting columns into rows) ?**

In [None]:
# Sample data
data = [(2021, 2, 4500, 5500),
(2021, 1, 4000, 5000),
(2021, 3, 5000, 6000),
(2021, 4, 6000, 7000)]

# Create DataFrame
columns = ["year", "quarter", "EU", "US"]
df = spark.createDataFrame(data, columns)

df.show()

+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      2|4500|5500|
|2021|      1|4000|5000|
|2021|      3|5000|6000|
|2021|      4|6000|7000|
+----+-------+----+----+



Unpivot is a reverse operation, we can achieve by rotating column values into rows values. PySpark SQL doesn’t have unpivot function hence will use the stack() function. Below code converts column countries to row.

In [None]:
from pyspark.sql.functions import expr

unpivotExpr ="stack(2, 'EU' , EU, 'US',US) as (Region,Revenue)"
unPivotDF = df.select("year","quarter",expr(unpivotExpr)).where("Revenue is not null")
unPivotDF.show(truncate=False)
unPivotDF.show()


+----+-------+------+-------+
|year|quarter|Region|Revenue|
+----+-------+------+-------+
|2021|2      |EU    |4500   |
|2021|2      |US    |5500   |
|2021|1      |EU    |4000   |
|2021|1      |US    |5000   |
|2021|3      |EU    |5000   |
|2021|3      |US    |6000   |
|2021|4      |EU    |6000   |
|2021|4      |US    |7000   |
+----+-------+------+-------+

+----+-------+------+-------+
|year|quarter|Region|Revenue|
+----+-------+------+-------+
|2021|      2|    EU|   4500|
|2021|      2|    US|   5500|
|2021|      1|    EU|   4000|
|2021|      1|    US|   5000|
|2021|      3|    EU|   5000|
|2021|      3|    US|   6000|
|2021|      4|    EU|   6000|
|2021|      4|    US|   7000|
+----+-------+------+-------+



# **51. How to impute missing values with Zero?**

In [None]:
# Suppose df is your DataFrame
df = spark.createDataFrame([(1, None), (None, 2), (3, 4), (5, None)], ["a", "b"])
df.show()

+----+----+
|   a|   b|
+----+----+
|   1|null|
|null|   2|
|   3|   4|
|   5|null|
+----+----+



In [None]:
df = df.fillna(0)
df.show()

+---+---+
|  a|  b|
+---+---+
|  1|  0|
|  0|  2|
|  3|  4|
|  5|  0|
+---+---+



# **52. How to identify continuous variables in a dataframe and create a list of those column names?**

In [None]:
from pyspark import SparkFiles

url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling_m.csv"
spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling_m.csv"), header=True, inferSchema=True)

#df = spark.read.csv("C:/Users/RajeshVaddi/Documents/MLPlus/DataSets/Churn_Modelling_m.csv", header=True, inferSchema=True)

df.show(2, truncate=False)

+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|1        |15634602  |Hargrave|619        |France   |Female|42 |2     |0.0     |1            |1        |1             |101348.88      |1     |
|2        |15647311  |Hill    |608        |Spain    |Female|41 |1     |83807.86|1            |0        |1             |112542.58      |0     |
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
only showing top 2 rows



# PySpark Variable type Identification – A Comprehensive Guide to Identifying Discrete, Categorical, and Continuous Variables in Data

Let’s Explore what are discrete, categorical, and continuous variables, their identification techniques, and their importance in machine learning and statistical modeling.
Data preprocessing is a critical step in machine learning and statistical modeling. Before diving into model building, it is essential to understand and identify the types of variables present in the dataset.

Furthermore, I will provide a PySpark function to identify variable types in a PySpark DataFrame.

*Types of Variables:*
a. **Discrete Variables**: Discrete variables represent countable data, typically integers. Examples include the number of employees in a company or the number of students in a class.

b. **Categorical Variables:** Categorical variables represent data that can be divided into distinct categories, such as gender or eye color. They can be either nominal (no order) or ordinal (with a meaningful order).

**c. Continuous Variables:** Continuous variables represent data that can take any value within a given range. Examples include height, weight, and temperature.


*How to Identifying Variable types?*

**a. Discrete Variables: **Check if the variable represents countable data and has a limited number of unique values.

**b. Categorical Variables: **Check if the variable can be divided into distinct categories. Ordinal variables can be identified by a meaningful order among categories.

**c. Continuous Variables:** Check if the variable can take any value within a given range, and if the data has a continuous distribution.

**threshold refers to a specific value or limit that, when exceeded, triggers a certain action, response, or event.** Thresholds are commonly used in various areas of IT, such as monitoring systems, data analysis, cybersecurity, and performance optimization.

In [None]:
from pyspark.sql.types import IntegerType, StringType, NumericType
from pyspark.sql.functions import approxCountDistinct

def detect_continuous_variables(df, distinct_threshold):
  """
  Identify continuous variables in a PySpark DataFrame.
  :param df: The input PySpark DataFrame
  :param distinct_threshold: Threshold to qualify as continuous variables - Count of distinct values > distinct_threshold
  :return: A List containing names of continuous variables
  """
  continuous_columns = []
  for column in df.columns:
    dtype = df.schema[column].dataType
    if isinstance(dtype, (IntegerType, NumericType)):
      distinct_count = df.select(approxCountDistinct(column)).collect()[0][0]
      if distinct_count > distinct_threshold:
        continuous_columns.append(column)

  return continuous_columns

continuous_variables = detect_continuous_variables(df, 10)
print(continuous_variables)

['RowNumber', 'CustomerId', 'CreditScore', 'Age', 'Tenure', 'Balance', 'EstimatedSalary']


# **53. How to calculate Mode of a PySpark DataFrame column?**

In [None]:
# Create a sample DataFrame
# https://www.statology.org/pyspark-mode-of-column/
# The mode in statistics is the number that appears most often in a given set of data
data = [(1, 2, 3), (2, 2, 3), (2, 2, 4), (1, 2, 3), (1, 1, 3)]
columns = ["col1", "col2", "col3"]

df = spark.createDataFrame(data, columns)

df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   2|   2|   3|
|   2|   2|   4|
|   1|   2|   3|
|   1|   1|   3|
+----+----+----+



In [None]:
from pyspark.sql.functions import col

df_grouped = df.groupBy('col2').count()
mode_df = df_grouped.orderBy(col('count').desc()).limit(1)

mode_df.show()

+----+-----+
|col2|count|
+----+-----+
|   2|    4|
+----+-----+



# **54. How to find installed location of Apache Spark and PySpark?**

In [None]:
import findspark
import os
import pyspark
#Version 1
findspark.init()
print(findspark.find())
#Version 2
print(os.path.dirname(pyspark.__file__))

/usr/local/lib/python3.10/dist-packages/pyspark
/usr/local/lib/python3.10/dist-packages/pyspark


# **55. How to convert a column to lower case using UDF?**

In [None]:
# Create a DataFrame to test
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

df = spark.createDataFrame(data, ['Name', 'City'])

df.show()

+------------+-------------+
|        Name|         City|
+------------+-------------+
|    John Doe|     NEW YORK|
|    Jane Doe|  LOS ANGELES|
|Mike Johnson|      CHICAGO|
|  Sara Smith|SAN FRANCISCO|
+------------+-------------+



In [None]:
"""
UDF’s a.k.a User Defined Functions, If you are coming from SQL background,
UDF’s are nothing new to you as most of the traditional RDBMS databases support User Defined Functions,
these functions need to register in the database library and use them on SQL as regular functions.

PySpark UDF’s are similar to UDF on traditional databases. In PySpark,
you create a function in a Python syntax and wrap it with PySpark SQL udf() or register it as udf and use it on DataFrame and SQL respectively.
"""
def tolower(s):
  if s is not None:
    return s.lower()

tolower_udf = udf(tolower, StringType())

df = df.withColumn('City', tolower_udf(df['City']))

df.show()

+------------+-------------+
|        Name|         City|
+------------+-------------+
|    John Doe|     new york|
|    Jane Doe|  los angeles|
|Mike Johnson|      chicago|
|  Sara Smith|san francisco|
+------------+-------------+



# **56. How to convert PySpark data frame to pandas dataframe?**

In [None]:
# Create a DataFrame to test
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

pysparkDF = spark.createDataFrame(data, ['Name', 'City'])

pysparkDF.show()

+------------+-------------+
|        Name|         City|
+------------+-------------+
|    John Doe|     NEW YORK|
|    Jane Doe|  LOS ANGELES|
|Mike Johnson|      CHICAGO|
|  Sara Smith|SAN FRANCISCO|
+------------+-------------+



In [None]:
pandas_df = pysparkDF.toPandas()
print(pandas_df)

           Name           City
0      John Doe       NEW YORK
1      Jane Doe    LOS ANGELES
2  Mike Johnson        CHICAGO
3    Sara Smith  SAN FRANCISCO


# **57. How to View PySpark Cluster Details?**

In [None]:
print(spark.sparkContext.uiWebUrl)

http://d305bb521c3d:4040


# **58. How to View PySpark Cluster Configuration Details?**

In [None]:
configurations = spark.sparkContext.getConf().getAll()
for item in configurations: print(item)

# Print all configurations
for k,v in spark.sparkContext.getConf().getAll():
  print(f"{k} : {v}")

('spark.app.name', 'PySpark 101 Exercises')
('spark.driver.host', '30563843d879')
('spark.app.startTime', '1722971858665')
('spark.driver.port', '36461')
('spark.executor.id', 'driver')
('spark.app.id', 'local-1722971864050')
('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.se

# **59. How to restrict the PySpark to use the number of cores in the system?**

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()
conf.set("spark.executor.cores", "2") # set the number of cores you want here
sc = SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySpark 101 Exercises, master=local[*]) created by getOrCreate at <ipython-input-2-83eafe77f2d8>:5 

# **60. How to cache PySpark DataFrame or objects and delete cache?**

In [None]:
# Caching the DataFrame
df.cache()

# un-cache or unpersist data using the unpersist() method.
df.unpersist()

DataFrame[Name: string, City: string]

# **61. How to Divide a PySpark DataFrame randomly in a given ratio (0.8, 0.2)?**

In [None]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# **62. How to build logistic regression in PySpark?**

In [None]:
# Create a sample dataframe
data = spark.createDataFrame([
(0, 1.0, -1.0),
(1, 2.0, 1.0),
(1, 3.0, -2.0),
(0, 4.0, 1.0),
(1, 5.0, -3.0),
(0, 6.0, 2.0),
(1, 7.0, -1.0),
(0, 8.0, 3.0),
(1, 9.0, -2.0),
(0, 10.0, 2.0),
(1, 11.0, -3.0),
(0, 12.0, 1.0),
(1, 13.0, -1.0),
(0, 14.0, 2.0),
(1, 15.0, -2.0),
(0, 16.0, 3.0),
(1, 17.0, -3.0),
(0, 18.0, 1.0),
(1, 19.0, -1.0),
(0, 20.0, 2.0)
], ["label", "feat1", "feat2"])

In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# convert the feature columns into a single vector column using VectorAssembler
assembler = VectorAssembler(inputCols=["feat1", "feat2"], outputCol="features")
data = assembler.transform(data)

# fit the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(data)

# look at the coefficients and intercept of the logistic regression model
print(f"Coefficients: {str(model.coefficients)}")
print(f"Intercept: {str(model.intercept)}")

Coefficients: [0.02027774047578661,-1.612960940022365]
Intercept: -0.22092927518295308


# **63. How to convert the categorical string data into numerical data or index?**

In [23]:
# Create a sample DataFrame
data = [('cat',), ('dog',), ('mouse',), ('fish',), ('dog',), ('cat',), ('mouse',)]
df = spark.createDataFrame(data, ["animal"])
df.show()

+------+
|animal|
+------+
|   cat|
|   dog|
| mouse|
|  fish|
|   dog|
|   cat|
| mouse|
+------+



In [25]:
from pyspark.ml.feature import StringIndexer
# Initialize a StringIndexer
indexer = StringIndexer(inputCol='animal', outputCol='animalIndex')

# Fit the indexer to the DataFrame and transform the data
indexed = indexer.fit(df).transform(df)
indexed.show()

+------+-----------+
|animal|animalIndex|
+------+-----------+
|   cat|        0.0|
|   dog|        1.0|
| mouse|        2.0|
|  fish|        3.0|
|   dog|        1.0|
|   cat|        0.0|
| mouse|        2.0|
+------+-----------+



# **64. How to calculate Correlation of two variables in a DataFrame?**

In [28]:
from pyspark.sql import Row
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()

+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
|       5|      10|      25|
|       6|      15|      35|
|       7|      25|      30|
|       8|      20|      60|
|       9|      30|      70|
+--------+--------+--------+



In [32]:
print(df.stat.corr('feature1', 'feature2'))
print(df.stat.corr('feature1', 'feature3'))
print(df.stat.corr('feature2', 'feature3'))


0.9
0.9177999171377654
0.6783738517974789


# **65. How to calculate Correlation Matrix?**

In [37]:
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()

+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
|       5|      10|      25|
|       6|      15|      35|
|       7|      25|      30|
|       8|      20|      60|
|       9|      30|      70|
+--------+--------+--------+



In [39]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# convert to vector column first
vector_col = "features"
assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

# get correlation matrix
matrix = Correlation.corr(df_vector, vector_col).head()[0]

print(matrix)

DenseMatrix([[1.        , 0.9       , 0.91779992],
             [0.9       , 1.        , 0.67837385],
             [0.91779992, 0.67837385, 1.        ]])


# **66. How to calculate VIF (Variance Inflation Factor ) for set of variables in a DataFrame?**



In [40]:
# Create a sample dataframe
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()

+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
|       5|      10|      25|
|       6|      15|      35|
|       7|      25|      30|
|       8|      20|      60|
|       9|      30|      70|
+--------+--------+--------+



In [45]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors
def calculate_vif(data, features):
  vif_dict = {}

  for feature in features:
    non_feature_cols = [col for col in features if col != feature]
    assembler = VectorAssembler(inputCols=non_feature_cols, outputCol="features")
    lr = LinearRegression(featuresCol='features', labelCol=feature)

    model = lr.fit(assembler.transform(data))
    vif = 1 / (1 - model.summary.r2)

    vif_dict[feature] = vif
  return vif_dict

features = ['feature1', 'feature2', 'feature3']
vif_values = calculate_vif(df, features)

for feature, vif in vif_values.items():
  print(f'VIF for {feature}: {vif}')

VIF for feature1: 66.2109375000003
VIF for feature2: 19.33593749999992
VIF for feature3: 23.3046875000001


# **67. How to perform Chi-Square test?**

In [46]:
# Create a sample dataframe
data = [(1, 0, 0, 1, 1),
(2, 0, 1, 0, 0),
(3, 1, 0, 0, 0),
(4, 0, 0, 1, 1),
(5, 0, 1, 1, 0)]

df = spark.createDataFrame(data, ["id", "feature1", "feature2", "feature3", "label"])

df.show()

+---+--------+--------+--------+-----+
| id|feature1|feature2|feature3|label|
+---+--------+--------+--------+-----+
|  1|       0|       0|       1|    1|
|  2|       0|       1|       0|    0|
|  3|       1|       0|       0|    0|
|  4|       0|       0|       1|    1|
|  5|       0|       1|       1|    0|
+---+--------+--------+--------+-----+



In [47]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
df = assembler.transform(df)

from pyspark.ml.stat import ChiSquareTest

r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

pValues: [0.36131042852617856,0.13603712811414348,0.1360371281141436]
degreesOfFreedom: [1, 1, 1]
statistics: [0.8333333333333335,2.2222222222222228,2.2222222222222223]


The Chi-Square Test is a statistical hypothesis **test used to determine if there is a significant association between two categorical variables in a sample.** *It is based on the difference between the observed frequencies in each category and the frequencies that we would expect to see under the assumption of independence (i.e., no relationship between the variables).*

The resulting test statistic follows a Chi-Square distribution when the null hypothesis of independence is true.

https://www.machinelearningplus.com/pyspark/pyspark-chi-square-test/

# **68. How to calculate the Standard Deviation?**




In [48]:
# Sample data
data = [("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)]

# Create DataFrame
df = spark.createDataFrame(data, ["Employee", "Department", "Salary"])

df.show()

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|   James|     Sales|  3000|
| Michael|     Sales|  4600|
|  Robert|     Sales|  4100|
|   Maria|   Finance|  3000|
|   James|     Sales|  3000|
|   Scott|   Finance|  3300|
|     Jen|   Finance|  3900|
|    Jeff| Marketing|  3000|
|   Kumar| Marketing|  2000|
|    Saif|     Sales|  4100|
+--------+----------+------+



In [50]:
from pyspark.sql import functions as F
df.select(F.stddev("Salary")).show()

salary_stddev = df.select(F.stddev("Salary").alias("stddev"))
salary_stddev.show()

+-------------------+
|stddev_samp(Salary)|
+-------------------+
|  765.9416862050705|
+-------------------+

+-----------------+
|           stddev|
+-----------------+
|765.9416862050705|
+-----------------+



# **69. How to calculate missing value percentage in each column?**

In [62]:
# Create a sample dataframe
data = [("John", "Doe", None),
(None, "Smith", "New York"),
("Mike", "Smith", None),
("Anna", "Smith", "Boston"),
(None, None, None)]

df = spark.createDataFrame(data, ["FirstName", "LastName", "City"])

df.show()

+---------+--------+--------+
|FirstName|LastName|    City|
+---------+--------+--------+
|     John|     Doe|    null|
|     null|   Smith|New York|
|     Mike|   Smith|    null|
|     Anna|   Smith|  Boston|
|     null|    null|    null|
+---------+--------+--------+



In [65]:
# Calculate the total number of rows in the dataframe
total_rows = df.count()

# For each column calculate the number of null values and then calculate the percentage
for column in df.columns:
  null_values = df.filter(df[column].isNull()).count()
  missing_percentage = (null_values / total_rows) * 100
  print(f"Missing values in {column}: {missing_percentage}%")

Missing values in FirstName: 40.0%
Missing values in LastName: 20.0%
Missing values in City: 60.0%


# **70. How to get the names of DataFrame objects that have been created in an environment?**

In [67]:
dataframe_names = [name for name, obj in globals().items() if isinstance(obj, pyspark.sql.DataFrame)]

for name in dataframe_names:
  print(name)

pysparkDF
train_data
test_data
df
indexed
df_vector
salary_stddev
amount_missing_df
