In [48]:
import sys
import os
os.environ['HADOOP_HOME'] = "C:\\hadoop"
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import findspark
findspark.init()
from pyspark import SparkFiles

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Pyspark 101 exercises").getOrCreate()
print(spark.version)


3.5.3


In [12]:
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

w = Window.orderBy(monotonically_increasing_id())

df = df.withColumn("index", row_number().over(w) - 1)

df.show()

+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|    1|
|    Bob|    2|
|Charlie|    3|
+-------+-----+

+-------+-----+-----+
|   Name|Value|index|
+-------+-----+-----+
|  Alice|    1|    0|
|    Bob|    2|    1|
|Charlie|    3|    2|
+-------+-----+-----+



In [13]:
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]
rdd = spark.sparkContext.parallelize(list(zip(list1, list2)))
df = rdd.toDF(["Column1", "Column2"])

df.show()

+-------+-------+
|Column1|Column2|
+-------+-------+
|      a|      1|
|      b|      2|
|      c|      3|
|      d|      4|
+-------+-------+



In [15]:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]
sc = spark.sparkContext

rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

result_rdd_A = rdd_A.subtract(rdd_B)
result_rdd_B = rdd_B.subtract(rdd_A)

result_rdd = result_rdd_A.union(result_rdd_B)

result_list = result_rdd.collect()

print(result_list)

[1, 2, 3, 6, 7, 8]


In [None]:
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()
quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)

print("Min: ", quantiles[0])
print("25th percentile: ", quantiles[1])
print("Median: ", quantiles[2])
print("75th percentile: ", quantiles[3])
print("Max: ", quantiles[4])

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+

Min:  10.0
25th percentile:  20.0
Median:  30.0
75th percentile:  50.0
Max:  86.0


In [None]:
from pyspark.sql import Row

data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

df = spark.createDataFrame(data)

df.show()
df.groupBy("job").count().show()


+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+

+---------+-----+
|      job|count|
+---------+-----+
| Engineer|    4|
|Scientist|    2|
|   Doctor|    1|
+---------+-----+



In [None]:
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()
from pyspark.sql.functions import col, when

top_2_jobs = df.groupBy('job').count().orderBy('count', ascending=False).limit(2).select('job').rdd.flatMap(lambda x: x).collect()

df = df.withColumn('job', when(col('job').isin(top_2_jobs), col('job')).otherwise('Other'))

df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|    Other|
+----+---------+



In [None]:
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

df_2 = df.dropna(subset=['Value'])

df_2.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B| NULL| 123|
|   B|    3| 456|
|   D| NULL|NULL|
+----+-----+----+

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B|    3| 456|
+----+-----+----+



In [None]:
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])

# old column names
old_names = ["col1", "col2", "col3"]

# new column names
new_names = ["new_col1", "new_col2", "new_col3"]

df.show()
for old_name, new_name in zip(old_names, new_names):
    df = df.withColumnRenamed(old_name, new_name)

df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|       1|       2|       3|
|       4|       5|       6|
+--------+--------+--------+



In [None]:
from pyspark.sql.functions import rand
from pyspark.ml.feature import Bucketizer

# Create a DataFrame with a single column "values" filled with random numbers
num_items = 100
df = spark.range(num_items).select(rand(seed=42).alias("values"))

df.show(5)
num_buckets = 10
quantiles = df.stat.approxQuantile("values", [i/num_buckets for i in range(num_buckets+1)], 0.01)

bucketizer = Bucketizer(splits=quantiles, inputCol="values", outputCol="buckets")

df_buck = bucketizer.transform(df)

df_buck.groupBy("buckets").count().show()

df_buck.show(5)

+-------------------+
|             values|
+-------------------+
|  0.619189370225301|
| 0.5096018842446481|
| 0.8325259388871524|
|0.26322809041172357|
| 0.6702867696264135|
+-------------------+
only showing top 5 rows
+-------+-----+
|buckets|count|
+-------+-----+
|    4.0|   10|
|    3.0|   10|
|    6.0|   10|
|    9.0|   12|
|    8.0|   10|
|    2.0|   10|
|    7.0|   10|
|    5.0|   10|
|    0.0|    8|
|    1.0|   10|
+-------+-----+

+-------------------+-------+
|             values|buckets|
+-------------------+-------+
|  0.619189370225301|    6.0|
| 0.5096018842446481|    4.0|
| 0.8325259388871524|    9.0|
|0.26322809041172357|    3.0|
| 0.6702867696264135|    6.0|
+-------------------+-------+
only showing top 5 rows


In [None]:
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
df = spark.createDataFrame(data, ["category1", "category2"])

df.show()

df.cube("category1").count().show()
df.crosstab('category1', 'category2').show()

+---------+---------+
|category1|category2|
+---------+---------+
|        A|        X|
|        A|        Y|
|        A|        X|
|        B|        Y|
|        B|        X|
|        C|        X|
|        C|        X|
|        C|        Y|
+---------+---------+

+---------+-----+
|category1|count|
+---------+-----+
|     NULL|    8|
|        A|    3|
|        B|    2|
|        C|    3|
+---------+-----+

+-------------------+---+---+
|category1_category2|  X|  Y|
+-------------------+---+---+
|                  B|  1|  1|
|                  C|  2|  1|
|                  A|  2|  1|
+-------------------+---+---+



In [None]:
from pyspark.sql.functions import rand, col, when

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

df = df.withColumn("is_multiple_of_3", when(col("random") % 3 == 0, 1).otherwise(0))

df.show()


+---+------+
| id|random|
+---+------+
|  0|     9|
|  1|     8|
|  2|     1|
|  3|     4|
|  4|     5|
|  5|     1|
|  6|    10|
|  7|     1|
|  8|    10|
|  9|     6|
+---+------+

+---+------+----------------+
| id|random|is_multiple_of_3|
+---+------+----------------+
|  0|     9|               1|
|  1|     8|               0|
|  2|     1|               0|
|  3|     4|               0|
|  4|     5|               0|
|  5|     1|               0|
|  6|    10|               0|
|  7|     1|               0|
|  8|    10|               0|
|  9|     6|               1|
+---+------+----------------+



In [None]:
from pyspark.sql.functions import rand, row_number, monotonically_increasing_id
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

pos = [0, 4, 8, 5]

w = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("index", row_number().over(w) - 1)

df.show()
df_filtered = df.filter(df.index.isin(pos))

df_filtered.show()

+---+------+
| id|random|
+---+------+
|  0|     9|
|  1|     8|
|  2|     1|
|  3|     4|
|  4|     5|
|  5|     1|
|  6|    10|
|  7|     1|
|  8|    10|
|  9|     6|
+---+------+

+---+------+-----+
| id|random|index|
+---+------+-----+
|  0|     9|    0|
|  1|     8|    1|
|  2|     1|    2|
|  3|     4|    3|
|  4|     5|    4|
|  5|     1|    5|
|  6|    10|    6|
|  7|     1|    7|
|  8|    10|    8|
|  9|     6|    9|
+---+------+-----+

+---+------+-----+
| id|random|index|
+---+------+-----+
|  0|     9|    0|
|  4|     5|    4|
|  5|     1|    5|
|  8|    10|    8|
+---+------+-----+



In [None]:
# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()
df_A.union(df_B).show()


+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_3|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



In [None]:
data = [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
df = spark.createDataFrame(data, ["actual", "predicted"])

df.show()

df = df.withColumn("squared_error", pow((col("actual") - col("predicted")), 2))
mse = df.agg({"squared_error": "avg"}).collect()[0][0]

print(f"Mean Squared Error (MSE) = {mse}")

+------+---------+
|actual|predicted|
+------+---------+
|     1|        1|
|     2|        4|
|     3|        9|
|     4|       16|
|     5|       25|
+------+---------+

Mean Squared Error (MSE) = 116.8


In [None]:
from pyspark.sql.functions import initcap

data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])
df = df.withColumn("name", initcap(df["name"]))

df.show()

+-----+
| name|
+-----+
| John|
|Alice|
|  Bob|
+-----+



In [None]:
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()

summary = df.summary()
summary.show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
|  Maria| 29| 80000|
|    Jen| 32| 65000|
+-------+---+------+

+-------+------+-----------------+-----------------+
|summary|  name|              age|           salary|
+-------+------+-----------------+-----------------+
|  count|     5|                5|                5|
|   mean|  NULL|             32.4|          66000.0|
| stddev|  NULL|3.209361307176242|9617.692030835675|
|    min| James|               29|            55000|
|    25%|  NULL|               30|            60000|
|    50%|  NULL|               32|            65000|
|    75%|  NULL|               34|            70000|
|    max|Robert|               37|            80000|
+-------+------+-----------------+-----------------+



In [None]:
from pyspark.sql import functions as F

data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()

df = df.withColumn('word_length', F.length(df.name))
df.show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+

+-----+-----------+
| name|word_length|
+-----+-----------+
| john|          4|
|alice|          5|
|  bob|          3|
+-----+-----------+



In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()
df = df.withColumn("id", F.monotonically_increasing_id())
window = Window.orderBy("id")

df = df.withColumn("prev_value", F.lag(df.salary).over(window))

df = df.withColumn("diff", F.when(F.isnull(df.salary - df.prev_value), 0)
.otherwise(df.salary - df.prev_value)).drop("id")

df.show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
|  Maria| 29| 80000|
|    Jen| 32| 65000|
+-------+---+------+

+-------+---+------+----------+------+
|   name|age|salary|prev_value|  diff|
+-------+---+------+----------+------+
|  James| 34| 55000|      NULL|     0|
|Michael| 30| 70000|     55000| 15000|
| Robert| 37| 60000|     70000|-10000|
|  Maria| 29| 80000|     60000| 20000|
|    Jen| 32| 65000|     80000|-15000|
+-------+---+------+----------+------+



In [None]:
from pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, dayofweek

data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])

df.show()
df = df.withColumn("date_1", to_date(df.date_str_1, 'yyyy-MM-dd'))
df = df.withColumn("date_2", to_date(df.date_str_2, 'dd MMM yyyy'))

df = df.withColumn("day_of_month", dayofmonth(df.date_1))\
.withColumn("week_number", weekofyear(df.date_1))\
.withColumn("day_of_year", dayofyear(df.date_1))\
.withColumn("day_of_week", dayofweek(df.date_1))

df.show()

+----------+-----------+
|date_str_1| date_str_2|
+----------+-----------+
|2023-05-18|01 Jan 2010|
|2023-12-31|01 Jan 2010|
+----------+-----------+

+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|date_str_1| date_str_2|    date_1|    date_2|day_of_month|week_number|day_of_year|day_of_week|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|2023-05-18|01 Jan 2010|2023-05-18|2010-01-01|          18|         20|        138|          5|
|2023-12-31|01 Jan 2010|2023-12-31|2010-01-01|          31|         52|        365|          1|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+



In [None]:
from pyspark.sql.functions import expr, col
df = spark.createDataFrame([('Jan 2010',), ('Feb 2011',), ('Mar 2012',)], ['MonthYear'])

df.show()

df = df.withColumn('Date', expr("to_date(MonthYear, 'MMM yyyy')"))

df.show()

df = df.withColumn('Date', expr("date_add(date_sub(Date, day(Date) - 1), 3)"))

df.show()

+---------+
|MonthYear|
+---------+
| Jan 2010|
| Feb 2011|
| Mar 2012|
+---------+

+---------+----------+
|MonthYear|      Date|
+---------+----------+
| Jan 2010|2010-01-01|
| Feb 2011|2011-02-01|
| Mar 2012|2012-03-01|
+---------+----------+

+---------+----------+
|MonthYear|      Date|
+---------+----------+
| Jan 2010|2010-01-04|
| Feb 2011|2011-02-04|
| Mar 2012|2012-03-04|
+---------+----------+



In [None]:
from pyspark.sql.functions import col, length, translate
df = spark.createDataFrame([('Apple',), ('Orange',), ('Plan',) , ('Python',) , ('Money',)], ['Word'])

df.show()

df_filtered = df.where((length(col('Word')) - length(translate(col('Word'), 'AEIOUaeiou', ''))) >= 2)
df_filtered.show()

+------+
|  Word|
+------+
| Apple|
|Orange|
|  Plan|
|Python|
| Money|
+------+

+------+
|  Word|
+------+
| Apple|
|Orange|
| Money|
+------+



In [None]:
data = ['buying books at amazom.com', 'rameses@egypt.com', 'matt@t.co', 'narendra@modi.com']

df = spark.createDataFrame(data, "string")
df.show(truncate =False)

pattern = "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"

df_filtered = df.filter(F.col("value").rlike(pattern))

df_filtered.show()

+--------------------------+
|value                     |
+--------------------------+
|buying books at amazom.com|
|rameses@egypt.com         |
|matt@t.co                 |
|narendra@modi.com         |
+--------------------------+

+-----------------+
|            value|
+-----------------+
|rameses@egypt.com|
|        matt@t.co|
|narendra@modi.com|
+-----------------+



In [None]:
# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
df.show()

pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")

pivot_df.show()

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021|      1|    US|   5000|
|2021|      1|    EU|   4000|
|2021|      2|    US|   5500|
|2021|      2|    EU|   4500|
|2021|      3|    US|   6000|
|2021|      3|    EU|   5000|
|2021|      4|    US|   7000|
|2021|      4|    EU|   6000|
+----+-------+------+-------+

+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      2|4500|5500|
|2021|      1|4000|5000|
|2021|      3|5000|6000|
|2021|      4|6000|7000|
+----+-------+----+----+



In [None]:
from pyspark.sql.functions import mean

# Sample data
data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]

# Create DataFrame
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)

df.show()

result = df.groupBy("Product").agg(mean("Price").alias("Total_Sales"))
result.show()

+-------+----------+-----+
|OrderID|   Product|Price|
+-------+----------+-----+
|   1001|    Laptop| 1000|
|   1002|     Mouse|   50|
|   1003|    Laptop| 1200|
|   1004|     Mouse|   30|
|   1005|Smartphone|  700|
+-------+----------+-----+

+----------+-----------+
|   Product|Total_Sales|
+----------+-----------+
|    Laptop|     1100.0|
|     Mouse|       40.0|
|Smartphone|      700.0|
+----------+-----------+



In [None]:
from pyspark.sql.functions import expr
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
data = [(1, 10), (2, 9), (3, 8), (4, 7), (5, 6), (6, 5), (7, 4), (8, 3), (9, 2), (10, 1)]

# Convert list to DataFrame
df = spark.createDataFrame(data, ["series1", "series2"])

df.show()


vecAssembler = VectorAssembler(inputCols=["series1", "series2"], outputCol="vectors")
df = vecAssembler.transform(df)

df = df.withColumn("squared_diff", expr("POW(series1 - series2, 2)"))

df.agg(expr("SQRT(SUM(squared_diff))").alias("euclidean_distance")).show()

+-------+-------+
|series1|series2|
+-------+-------+
|      1|     10|
|      2|      9|
|      3|      8|
|      4|      7|
|      5|      6|
|      6|      5|
|      7|      4|
|      8|      3|
|      9|      2|
|     10|      1|
+-------+-------+

+------------------+
|euclidean_distance|
+------------------+
| 18.16590212458495|
+------------------+



In [None]:
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import StringType, ArrayType
from collections import Counter
#Sample DataFrame
df = spark.createDataFrame([('dbc deb abed gade',),], ["string"])
df.show()

def least_freq_char_replace_spaces(s):
    counter = Counter(s.replace(" ", ""))
    least_freq_char = min(counter, key = counter.get)
    return s.replace(' ', least_freq_char)

udf_least_freq_char_replace_spaces = udf(least_freq_char_replace_spaces, StringType())

df = spark.createDataFrame([('dbc deb abed gade',)], ["string"])
df.withColumn('modified_string', udf_least_freq_char_replace_spaces(df['string'])).show()

+-----------------+
|           string|
+-----------------+
|dbc deb abed gade|
+-----------------+

+-----------------+-----------------+
|           string|  modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+



In [None]:
from pyspark.sql.functions import expr, explode, sequence, rand

start_date = '2000-01-01'
end_date = '2000-03-04' 

df = spark.range(1).select(
explode(
sequence(
expr(f"date '{start_date}'"),
expr(f"date '{end_date}'"),
expr("interval 1 day")
)
).alias("date")
)

df = df.filter(expr("dayofweek(date) = 7"))
df = df.withColumn("random_numbers", ((rand(seed=42) * 10) + 1).cast("int"))

df.show()

+----------+--------------+
|      date|random_numbers|
+----------+--------------+
|2000-01-01|             6|
|2000-01-08|             1|
|2000-01-15|             7|
|2000-01-22|             7|
|2000-01-29|             2|
|2000-02-05|             4|
|2000-02-12|             3|
|2000-02-19|             8|
|2000-02-26|             1|
|2000-03-04|             5|
+----------+--------------+



In [None]:
url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling.csv"

spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"), header=True, inferSchema=True)


df.show(5, truncate=False)

nrows = df.count()
print("Number of Rows: ", nrows)

ncols = len(df.columns)
print("Number of Columns: ", ncols)

datatypes = df.dtypes
print("Data types: ", datatypes)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance  |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|1        |15634602  |Hargrave|619        |France   |Female|42 |2     |0.0      |1            |1        |1             |101348.88      |1     |
|2        |15647311  |Hill    |608        |Spain    |Female|41 |1     |83807.86 |1            |0        |1             |112542.58      |0     |
|3        |15619304  |Onio    |502        |France   |Female|42 |8     |159660.8 |3            |1        |0             |113931.57      |1     |
|4        |15701354  |Boni    |699        |France   |Female|39 |1     |0.0      |2            |0        |0             |93826.63       |

In [None]:
df = spark.createDataFrame([('Alice', 1, 30),('Bob', 2, 35)], ["name", "age", "qty"])

df.show()

old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]

for old_name, new_name in zip(old_names, new_names):
    df = df.withColumnRenamed(old_name, new_name)

df.show()

+-----+---+---+
| name|age|qty|
+-----+---+---+
|Alice|  1| 30|
|  Bob|  2| 35|
+-----+---+---+

+-----+--------+--------+
| name|user_age|user_qty|
+-----+--------+--------+
|Alice|       1|      30|
|  Bob|       2|      35|
+-----+--------+--------+



In [None]:
from pyspark.sql.functions import col, sum

df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
has_missing = any(row.asDict().values() for row in missing.collect())
print(has_missing)

missing_count = missing.collect()[0].asDict()
print(missing_count)

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B| NULL| 123|
|   B|    3| 456|
|   D| NULL|NULL|
+----+-----+----+

True
{'Name': 0, 'Value': 2, 'id': 2}


In [None]:
from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
("A", 1, None),
("B", None, 123 ),
("B", 3, 456),
("D", 6, None),
], ["Name", "var1", "var2"])

df.show()

column_names = ["var1", "var2"]
imputer = Imputer(inputCols= column_names, outputCols= column_names, strategy="mean")
model = imputer.fit(df)
imputed_df = model.transform(df)

imputed_df.show(5)

+----+----+----+
|Name|var1|var2|
+----+----+----+
|   A|   1|NULL|
|   B|NULL| 123|
|   B|   3| 456|
|   D|   6|NULL|
+----+----+----+

+----+----+----+
|Name|var1|var2|
+----+----+----+
|   A|   1| 289|
|   B|   3| 123|
|   B|   3| 456|
|   D|   6| 289|
+----+----+----+



In [None]:
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Alice", "Smith", 22)]

df = spark.createDataFrame(data, ["First_Name", "Last_Name", "Age"])

df.show()

new_order = ["Age", "First_Name", "Last_Name"]

df = df.select(*new_order)
df.show()

+----------+---------+---+
|First_Name|Last_Name|Age|
+----------+---------+---+
|      John|      Doe| 30|
|      Jane|      Doe| 25|
|     Alice|    Smith| 22|
+----------+---------+---+

+---+----------+---------+
|Age|First_Name|Last_Name|
+---+----------+---------+
| 30|      John|      Doe|
| 25|      Jane|      Doe|
| 22|     Alice|    Smith|
+---+----------+---------+



In [None]:
from pyspark.sql.functions import format_number

df = spark.createDataFrame([(1, 0.000000123), (2, 0.000023456), (3, 0.000345678)], ["id", "your_column"])
df.show()

decimal_places = 10

df = df.withColumn("your_column", format_number("your_column", decimal_places))
df.show()

+---+-----------+
| id|your_column|
+---+-----------+
|  1|    1.23E-7|
|  2|  2.3456E-5|
|  3| 3.45678E-4|
+---+-----------+

+---+------------+
| id| your_column|
+---+------------+
|  1|0.0000001230|
|  2|0.0000234560|
|  3|0.0003456780|
+---+------------+



In [None]:
from pyspark.sql.functions import concat, col, lit

data = [(0.1, .08), (0.2, .06), (0.33, .02)]
df = spark.createDataFrame(data, ["numbers_1", "numbers_2"])

df.show()

columns = ["numbers_1", "numbers_2"]

for col_name in columns:
    df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))

df.show()

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
|      0.1|     0.08|
|      0.2|     0.06|
|     0.33|     0.02|
+---------+---------+

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
|   10.00%|    8.00%|
|   20.00%|    6.00%|
|   33.00%|    2.00%|
+---------+---------+



In [16]:
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Dave", 4), ("Eve", 5),
("Frank", 6), ("Grace", 7), ("Hannah", 8), ("Igor", 9), ("Jack", 10)]

df = spark.createDataFrame(data, ["Name", "Number"])

df.show()
window = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("rn", row_number().over(window))
n = 5 
df = df.filter((df.rn % n) == 0)
df.show()

+-------+------+
|   Name|Number|
+-------+------+
|  Alice|     1|
|    Bob|     2|
|Charlie|     3|
|   Dave|     4|
|    Eve|     5|
|  Frank|     6|
|  Grace|     7|
| Hannah|     8|
|   Igor|     9|
|   Jack|    10|
+-------+------+

+----+------+---+
|Name|Number| rn|
+----+------+---+
| Eve|     5|  5|
|Jack|    10| 10|
+----+------+---+



In [17]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number
from pyspark.sql import Row

data = [
Row(id=1, column1=5),
Row(id=2, column1=8),
Row(id=3, column1=12),
Row(id=4, column1=1),
Row(id=5, column1=15),
Row(id=6, column1=7),
]

df = spark.createDataFrame(data)
df.show()

window = Window.orderBy(desc("column1"))
df = df.withColumn("row_number", row_number().over(window))

row = df.filter(df.row_number == n).first()

if row:
    print("Row number:", row.row_number)
    print("Column value:", row.column1)

+---+-------+
| id|column1|
+---+-------+
|  1|      5|
|  2|      8|
|  3|     12|
|  4|      1|
|  5|     15|
|  6|      7|
+---+-------+

Row number: 5
Column value: 5


In [18]:
from pyspark.sql import functions as F
from functools import reduce
data = [(10, 25, 70),
(40, 5, 20),
(70, 80, 100),
(10, 2, 60),
(40, 50, 20)]

df = spark.createDataFrame(data, ["col1", "col2", "col3"])

df.show()

df = df.withColumn('row_sum', reduce(lambda a, b: a+b, [F.col(x) for x in df.columns]))

df.show()

df = df.filter(F.col('row_sum') > 100)

df.show()

df = df.withColumn('id', F.monotonically_increasing_id())

df_last_2 = df.sort(F.desc('id')).limit(2)

df_last_2.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|  10|  25|  70|
|  40|   5|  20|
|  70|  80| 100|
|  10|   2|  60|
|  40|  50|  20|
+----+----+----+

+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
|  10|  25|  70|    105|
|  40|   5|  20|     65|
|  70|  80| 100|    250|
|  10|   2|  60|     72|
|  40|  50|  20|    110|
+----+----+----+-------+

+----+----+----+-------+
|col1|col2|col3|row_sum|
+----+----+----+-------+
|  10|  25|  70|    105|
|  70|  80| 100|    250|
|  40|  50|  20|    110|
+----+----+----+-------+

+----+----+----+-------+------------+
|col1|col2|col3|row_sum|          id|
+----+----+----+-------+------------+
|  40|  50|  20|    110|128849018880|
|  70|  80| 100|    250| 77309411328|
+----+----+----+-------+------------+



In [19]:
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType

data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]

df = spark.createDataFrame(data, ["col1", "col2", "col3"])

df.show()

def min_max_ratio(row):
    return float(min(row)) / max(row)

min_max_ratio_udf = udf(min_max_ratio, FloatType())

df = df.withColumn('min_by_max', min_max_ratio_udf(array(df.columns)))

df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
|   7|   8|   9|
|  10|  11|  12|
+----+----+----+

+----+----+----+----------+
|col1|col2|col3|min_by_max|
+----+----+----+----------+
|   1|   2|   3|0.33333334|
|   4|   5|   6| 0.6666667|
|   7|   8|   9| 0.7777778|
|  10|  11|  12| 0.8333333|
+----+----+----+----------+



In [20]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType

data = [(10, 20, 30),
(40, 60, 50),
(80, 70, 90)]

df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

df.show()

sort_array_desc = F.udf(lambda arr: sorted(arr), ArrayType(IntegerType()))

df = df.withColumn("row_as_array", sort_array_desc(F.array(df.columns)))
df = df.withColumn("Penultimate", df['row_as_array'].getItem(1))
df = df.drop('row_as_array')

df.show()

+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
|     10|     20|     30|
|     40|     60|     50|
|     80|     70|     90|
+-------+-------+-------+

+-------+-------+-------+-----------+
|Column1|Column2|Column3|Penultimate|
+-------+-------+-------+-----------+
|     10|     20|     30|         20|
|     40|     60|     50|         50|
|     80|     70|     90|         80|
+-------+-------+-------+-----------+



In [21]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col

data = [(1, 2, 3),
(2, 3, 4),
(3, 4, 5),
(4, 5, 6)]

df = spark.createDataFrame(data, ["Col1", "Col2", "Col3"])

df.show()

input_cols = ["Col1", "Col2", "Col3"]
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
df_assembled = assembler.transform(df)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scalerModel = scaler.fit(df_assembled)
df_normalized = scalerModel.transform(df_assembled)
df_normalized = df_normalized.drop('features')

df_normalized.show(truncate=False)

+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
|   1|   2|   3|
|   2|   3|   4|
|   3|   4|   5|
|   4|   5|   6|
+----+----+----+

+----+----+----+-------------------------------------------------------------+
|Col1|Col2|Col3|scaled_features                                              |
+----+----+----+-------------------------------------------------------------+
|1   |2   |3   |[-1.161895003862225,-1.161895003862225,-1.161895003862225]   |
|2   |3   |4   |[-0.3872983346207417,-0.3872983346207417,-0.3872983346207417]|
|3   |4   |5   |[0.3872983346207417,0.3872983346207417,0.3872983346207417]   |
|4   |5   |6   |[1.161895003862225,1.161895003862225,1.161895003862225]      |
+----+----+----+-------------------------------------------------------------+



In [22]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col

data = [("John", "John"), ("Lily", "Lucy"), ("Sam", "Sam"), ("Lucy", "Lily")]
df = spark.createDataFrame(data, ["Name1", "Name2"])

df.show()

df = df.withColumn("Match", when(col("Name1") == col("Name2"), True).otherwise(False))

df.show()

+-----+-----+
|Name1|Name2|
+-----+-----+
| John| John|
| Lily| Lucy|
|  Sam|  Sam|
| Lucy| Lily|
+-----+-----+

+-----+-----+-----+
|Name1|Name2|Match|
+-----+-----+-----+
| John| John| true|
| Lily| Lucy|false|
|  Sam|  Sam| true|
| Lucy| Lily|false|
+-----+-----+-----+



In [23]:
from pyspark.sql.functions import lag, lead, to_date
from pyspark.sql.window import Window

# Create a sample DataFrame
data = [("2023-01-01", "Store1", 100),
("2023-01-02", "Store1", 150),
("2023-01-03", "Store1", 200),
("2023-01-04", "Store1", 250),
("2023-01-05", "Store1", 300),
("2023-01-01", "Store2", 50),
("2023-01-02", "Store2", 60),
("2023-01-03", "Store2", 80),
("2023-01-04", "Store2", 90),
("2023-01-05", "Store2", 120)]

df = spark.createDataFrame(data, ["Date", "Store", "Sales"])

df.show()

df = df.withColumn("Date", to_date(df.Date, 'yyyy-MM-dd'))

windowSpec = Window.partitionBy("Store").orderBy("Date")

df = df.withColumn("Lag_Sales", lag(df["Sales"]).over(windowSpec))
df = df.withColumn("Lead_Sales", lead(df["Sales"]).over(windowSpec))

df.show()

+----------+------+-----+
|      Date| Store|Sales|
+----------+------+-----+
|2023-01-01|Store1|  100|
|2023-01-02|Store1|  150|
|2023-01-03|Store1|  200|
|2023-01-04|Store1|  250|
|2023-01-05|Store1|  300|
|2023-01-01|Store2|   50|
|2023-01-02|Store2|   60|
|2023-01-03|Store2|   80|
|2023-01-04|Store2|   90|
|2023-01-05|Store2|  120|
+----------+------+-----+

+----------+------+-----+---------+----------+
|      Date| Store|Sales|Lag_Sales|Lead_Sales|
+----------+------+-----+---------+----------+
|2023-01-01|Store1|  100|     NULL|       150|
|2023-01-02|Store1|  150|      100|       200|
|2023-01-03|Store1|  200|      150|       250|
|2023-01-04|Store1|  250|      200|       300|
|2023-01-05|Store1|  300|      250|      NULL|
|2023-01-01|Store2|   50|     NULL|        60|
|2023-01-02|Store2|   60|       50|        80|
|2023-01-03|Store2|   80|       60|        90|
|2023-01-04|Store2|   90|       80|       120|
|2023-01-05|Store2|  120|       90|      NULL|
+----------+------+-----

In [24]:
from pyspark.sql.functions import col

data = [(1, 2, 3),
(2, 3, 4),
(1, 2, 3),
(4, 5, 6),
(2, 3, 4)]
df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

# Print DataFrame
df.show()

columns = df.columns

df_single = None

for c in columns:
    if df_single is None:
        df_single = df.select(col(c).alias("single_column"))
    else:
        df_single = df_single.union(df.select(col(c).alias("single_column")))

frequency_table = df_single.groupBy("single_column").count().orderBy('count', ascending=False)

frequency_table.show()

+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
|      1|      2|      3|
|      2|      3|      4|
|      1|      2|      3|
|      4|      5|      6|
|      2|      3|      4|
+-------+-------+-------+

+-------------+-----+
|single_column|count|
+-------------+-----+
|            2|    4|
|            3|    4|
|            4|    3|
|            1|    2|
|            5|    1|
|            6|    1|
+-------------+-----+



In [25]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.functions import when, col

data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(1, 2, 3, 4),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])

# Print DataFrame
df.show()

w = Window.orderBy(monotonically_increasing_id())

df = df.withColumn("id", row_number().over(w) - 1)

df = df.select([when(col("id") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])

df = df.withColumn("id", row_number().over(w) - 1)
df = df.withColumn("id_2", df.count() - 1 - df["id"])

df_with_diag_zero = df.select([when(col("id_2") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])

df_with_diag_zero.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    1|    2|    3|    4|
|    2|    3|    4|    5|
|    1|    2|    3|    4|
|    4|    5|    6|    7|
+-----+-----+-----+-----+

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    0|    2|    3|    0|
|    2|    0|    0|    5|
|    1|    0|    0|    4|
|    0|    5|    6|    0|
+-----+-----+-----+-----+



In [26]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(3, 4, 5, 6),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])

# Print DataFrame
df.show()

w = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("id", row_number().over(w) - 1)
df_2 = df.orderBy("id", ascending=False).drop("id")
df_2.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    1|    2|    3|    4|
|    2|    3|    4|    5|
|    3|    4|    5|    6|
|    4|    5|    6|    7|
+-----+-----+-----+-----+

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    4|    5|    6|    7|
|    3|    4|    5|    6|
|    2|    3|    4|    5|
|    1|    2|    3|    4|
+-----+-----+-----+-----+



In [27]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder


data = [("A", 10),("A", 20),("B", 30),("B", 20),("B", 30),("C", 40),("C", 10),("D", 10)]
columns = ["Categories", "Value"]
df = spark.createDataFrame(data, columns)
df.show()

indexer = StringIndexer(inputCol="Categories", outputCol="Categories_Indexed")
indexerModel = indexer.fit(df)

indexed_df = indexerModel.transform(df)

encoder = OneHotEncoder(inputCol="Categories_Indexed", outputCol="Categories_onehot")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
encoded_df = encoded_df.drop("Categories_Indexed")
encoded_df.show(truncate=False)

+----------+-----+
|Categories|Value|
+----------+-----+
|         A|   10|
|         A|   20|
|         B|   30|
|         B|   20|
|         B|   30|
|         C|   40|
|         C|   10|
|         D|   10|
+----------+-----+

+----------+-----+-----------------+
|Categories|Value|Categories_onehot|
+----------+-----+-----------------+
|A         |10   |(3,[1],[1.0])    |
|A         |20   |(3,[1],[1.0])    |
|B         |30   |(3,[0],[1.0])    |
|B         |20   |(3,[0],[1.0])    |
|B         |30   |(3,[0],[1.0])    |
|C         |40   |(3,[2],[1.0])    |
|C         |10   |(3,[2],[1.0])    |
|D         |10   |(3,[],[])        |
+----------+-----+-----------------+



In [28]:
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)

pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")

pivot_df.show()

+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      2|4500|5500|
|2021|      1|4000|5000|
|2021|      3|5000|6000|
|2021|      4|6000|7000|
+----+-------+----+----+



In [29]:
from pyspark.sql.functions import expr

data = [(2021, 2, 4500, 5500),
(2021, 1, 4000, 5000),
(2021, 3, 5000, 6000),
(2021, 4, 6000, 7000)]

# Create DataFrame
columns = ["year", "quarter", "EU", "US"]
df = spark.createDataFrame(data, columns)

df.show()

unpivotExpr = "stack(2, 'EU',EU, 'US', US) as (region,revenue)"

unPivotDF = pivot_df.select("year","quarter", expr(unpivotExpr)).where("revenue is not null")

unPivotDF.show()

+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      2|4500|5500|
|2021|      1|4000|5000|
|2021|      3|5000|6000|
|2021|      4|6000|7000|
+----+-------+----+----+

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021|      2|    EU|   4500|
|2021|      2|    US|   5500|
|2021|      1|    EU|   4000|
|2021|      1|    US|   5000|
|2021|      3|    EU|   5000|
|2021|      3|    US|   6000|
|2021|      4|    EU|   6000|
|2021|      4|    US|   7000|
+----+-------+------+-------+



In [30]:
df = spark.createDataFrame([(1, None), (None, 2), (3, 4), (5, None)], ["a", "b"])

df.show()

df_imputed = df.fillna(0)

df_imputed.show()

+----+----+
|   a|   b|
+----+----+
|   1|NULL|
|NULL|   2|
|   3|   4|
|   5|NULL|
+----+----+

+---+---+
|  a|  b|
+---+---+
|  1|  0|
|  0|  2|
|  3|  4|
|  5|  0|
+---+---+



In [31]:
url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling_m.csv"
spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling_m.csv"), header=True, inferSchema=True)

df.show(2, truncate=False)

from pyspark.sql.types import IntegerType, StringType, NumericType
from pyspark.sql.functions import approxCountDistinct

def detect_continuous_variables(df, distinct_threshold):
    continuous_columns = []
    for column in df.columns:
        dtype = df.schema[column].dataType
        if isinstance(dtype, (IntegerType, NumericType)):
            distinct_count = df.select(approxCountDistinct(column)).collect()[0][0]
        if distinct_count > distinct_threshold:
            continuous_columns.append(column)
    return continuous_columns

continuous_variables = detect_continuous_variables(df, 10)
print(continuous_variables)

+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|1        |15634602  |Hargrave|619        |France   |Female|42 |2     |0.0     |1            |1        |1             |101348.88      |1     |
|2        |15647311  |Hill    |608        |Spain    |Female|41 |1     |83807.86|1            |0        |1             |112542.58      |0     |
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
only showing top 2 rows





['RowNumber', 'CustomerId', 'Surname', 'CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 'Balance', 'EstimatedSalary']


In [32]:
from pyspark.sql.functions import col

data = [(1, 2, 3), (2, 2, 3), (2, 2, 4), (1, 2, 3), (1, 1, 3)]
columns = ["col1", "col2", "col3"]

df = spark.createDataFrame(data, columns)

df.show()
df_grouped = df.groupBy('col2').count()
mode_df = df_grouped.orderBy(col('count').desc()).limit(1)

mode_df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   2|   2|   3|
|   2|   2|   4|
|   1|   2|   3|
|   1|   1|   3|
+----+----+----+

+----+-----+
|col2|count|
+----+-----+
|   2|    4|
+----+-----+



In [33]:
import findspark
findspark.init()

print(findspark.find())

import os
import pyspark

print(os.path.dirname(pyspark.__file__))

f:\Anaconda\envs\Python311\Lib\site-packages\pyspark
f:\Anaconda\envs\Python311\Lib\site-packages\pyspark


In [49]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

df = spark.createDataFrame(data, ['Name', 'City'])

df.show()

def to_lower(s):
    if s is not None:
        return s.lower()

udf_to_lower = udf(to_lower, StringType())

df = df.withColumn('City_lower', udf_to_lower(df['City']))

df.show()

+------------+-------------+
|        Name|         City|
+------------+-------------+
|    John Doe|     NEW YORK|
|    Jane Doe|  LOS ANGELES|
|Mike Johnson|      CHICAGO|
|  Sara Smith|SAN FRANCISCO|
+------------+-------------+

+------------+-------------+-------------+
|        Name|         City|   City_lower|
+------------+-------------+-------------+
|    John Doe|     NEW YORK|     new york|
|    Jane Doe|  LOS ANGELES|  los angeles|
|Mike Johnson|      CHICAGO|      chicago|
|  Sara Smith|SAN FRANCISCO|san francisco|
+------------+-------------+-------------+



In [39]:
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

pysparkDF = spark.createDataFrame(data, ['Name', 'City'])

pysparkDF.show()

pandasDF = pysparkDF.toPandas()

print(pandasDF)

+------------+-------------+
|        Name|         City|
+------------+-------------+
|    John Doe|     NEW YORK|
|    Jane Doe|  LOS ANGELES|
|Mike Johnson|      CHICAGO|
|  Sara Smith|SAN FRANCISCO|
+------------+-------------+

           Name           City
0      John Doe       NEW YORK
1      Jane Doe    LOS ANGELES
2  Mike Johnson        CHICAGO
3    Sara Smith  SAN FRANCISCO


In [40]:
print(spark.sparkContext.uiWebUrl)

http://DESKTOP-M1LGPP1:4040


In [41]:
for k,v in spark.sparkContext.getConf().getAll():
    print(f"{k} : {v}")

spark.driver.extraJavaOptions : -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
spark.driver.host : DESKTOP-M1LGPP1
spark.app.submitTime : 1764111439454
spark.app.id : local-1764111440460
spar

In [44]:
from pyspark import SparkConf, SparkContext
sc.stop()
conf = SparkConf()
conf.set("spark.executor.cores", "2")
sc = SparkContext(conf=conf)

In [50]:
df.cache()
df.unpersist()

DataFrame[Name: string, City: string, City_lower: string]

In [51]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

AttributeError: 'list' object has no attribute 'randomSplit'

In [52]:
data = spark.createDataFrame([
(0, 1.0, -1.0),
(1, 2.0, 1.0),
(1, 3.0, -2.0),
(0, 4.0, 1.0),
(1, 5.0, -3.0),
(0, 6.0, 2.0),
(1, 7.0, -1.0),
(0, 8.0, 3.0),
(1, 9.0, -2.0),
(0, 10.0, 2.0),
(1, 11.0, -3.0),
(0, 12.0, 1.0),
(1, 13.0, -1.0),
(0, 14.0, 2.0),
(1, 15.0, -2.0),
(0, 16.0, 3.0),
(1, 17.0, -3.0),
(0, 18.0, 1.0),
(1, 19.0, -1.0),
(0, 20.0, 2.0)
], ["label", "feat1", "feat2"])

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

vecAssembler = VectorAssembler(inputCols=['feat1', 'feat2'], outputCol="features")
data = vecAssembler.transform(data)

lr = LogisticRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(data)

print(f"Coefficients: {str(lr_model.coefficients)}")
print(f"Intercept: {str(lr_model.intercept)}")

Coefficients: [0.020277740475786732,-1.6129609400223655]
Intercept: -0.22092927518295352


In [53]:
data = [('cat',), ('dog',), ('mouse',), ('fish',), ('dog',), ('cat',), ('mouse',)]
df = spark.createDataFrame(data, ["animal"])

df.show()

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='animal', outputCol='animalIndex')

indexed = indexer.fit(df).transform(df)
indexed.show()

+------+
|animal|
+------+
|   cat|
|   dog|
| mouse|
|  fish|
|   dog|
|   cat|
| mouse|
+------+

+------+-----------+
|animal|animalIndex|
+------+-----------+
|   cat|        0.0|
|   dog|        1.0|
| mouse|        2.0|
|  fish|        3.0|
|   dog|        1.0|
|   cat|        0.0|
| mouse|        2.0|
+------+-----------+



In [54]:
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()

correlation = df.corr("feature1", "feature2")

print("Correlation between feature1 and feature2 :", correlation)

+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
|       5|      10|      25|
|       6|      15|      35|
|       7|      25|      30|
|       8|      20|      60|
|       9|      30|      70|
+--------+--------+--------+

Correlation between feature1 and feature2 : 0.9


In [55]:
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()

from pyspark.ml.stat import Correlation

vector_assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data_vector = vector_assembler.transform(df).select("features")

correlation_matrix = Correlation.corr(data_vector, "features").head()[0]

print(correlation_matrix)

+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
|       5|      10|      25|
|       6|      15|      35|
|       7|      25|      30|
|       8|      20|      60|
|       9|      30|      70|
+--------+--------+--------+

DenseMatrix([[1.        , 0.9       , 0.91779992],
             [0.9       , 1.        , 0.67837385],
             [0.91779992, 0.67837385, 1.        ]])


In [56]:
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)

df.show()

from pyspark.sql import SparkSession, Row
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

def calculate_vif(data, features):
    vif_dict = {}

    for feature in features:
        non_feature_cols = [col for col in features if col != feature]
        assembler = VectorAssembler(inputCols=non_feature_cols, outputCol="features")
        lr = LinearRegression(featuresCol='features', labelCol=feature)

    model = lr.fit(assembler.transform(data))
    vif = 1 / (1 - model.summary.r2)

    vif_dict[feature] = vif

    return vif_dict

features = ['feature1', 'feature2', 'feature3']
vif_values = calculate_vif(df, features)

for feature, vif in vif_values.items():
    print(f'VIF for {feature}: {vif}')

+--------+--------+--------+
|feature1|feature2|feature3|
+--------+--------+--------+
|       5|      10|      25|
|       6|      15|      35|
|       7|      25|      30|
|       8|      20|      60|
|       9|      30|      70|
+--------+--------+--------+

VIF for feature3: 23.30468749999992


In [57]:
# Create a sample dataframe
data = [(1, 0, 0, 1, 1),
(2, 0, 1, 0, 0),
(3, 1, 0, 0, 0),
(4, 0, 0, 1, 1),
(5, 0, 1, 1, 0)]

df = spark.createDataFrame(data, ["id", "feature1", "feature2", "feature3", "label"])

df.show()

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
df = assembler.transform(df)

from pyspark.ml.stat import ChiSquareTest

r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

+---+--------+--------+--------+-----+
| id|feature1|feature2|feature3|label|
+---+--------+--------+--------+-----+
|  1|       0|       0|       1|    1|
|  2|       0|       1|       0|    0|
|  3|       1|       0|       0|    0|
|  4|       0|       0|       1|    1|
|  5|       0|       1|       1|    0|
+---+--------+--------+--------+-----+

pValues: [0.36131042852617856,0.13603712811414348,0.1360371281141436]
degreesOfFreedom: [1, 1, 1]
statistics: [0.8333333333333335,2.2222222222222228,2.2222222222222223]


In [58]:
# Sample data
data = [("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)]

# Create DataFrame
df = spark.createDataFrame(data, ["Employee", "Department", "Salary"])

df.show()

from pyspark.sql.functions import stddev

salary_stddev = df.select(stddev("Salary").alias("stddev"))

salary_stddev.show()

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|   James|     Sales|  3000|
| Michael|     Sales|  4600|
|  Robert|     Sales|  4100|
|   Maria|   Finance|  3000|
|   James|     Sales|  3000|
|   Scott|   Finance|  3300|
|     Jen|   Finance|  3900|
|    Jeff| Marketing|  3000|
|   Kumar| Marketing|  2000|
|    Saif|     Sales|  4100|
+--------+----------+------+

+-----------------+
|           stddev|
+-----------------+
|765.9416862050705|
+-----------------+



In [59]:
# Create a sample dataframe
data = [("John", "Doe", None),
(None, "Smith", "New York"),
("Mike", "Smith", None),
("Anna", "Smith", "Boston"),
(None, None, None)]

df = spark.createDataFrame(data, ["FirstName", "LastName", "City"])

df.show()

total_rows = df.count()

for column in df.columns:
    null_values = df.filter(df[column].isNull()).count()
    missing_percentage = (null_values / total_rows) * 100
print(f"Missing values in {column}: {missing_percentage}%")

+---------+--------+--------+
|FirstName|LastName|    City|
+---------+--------+--------+
|     John|     Doe|    NULL|
|     NULL|   Smith|New York|
|     Mike|   Smith|    NULL|
|     Anna|   Smith|  Boston|
|     NULL|    NULL|    NULL|
+---------+--------+--------+

Missing values in City: 60.0%


In [60]:
dataframe_names = [name for name, obj in globals().items() if isinstance(obj, pyspark.sql.DataFrame)]

for name in dataframe_names:
    print(name)

_
df
missing
imputed_df
df_last_2
df_assembled
df_normalized
df_single
frequency_table
df_with_diag_zero
df_2
indexed_df
encoded_df
pivot_df
unPivotDF
df_imputed
df_grouped
mode_df
pysparkDF
_50
indexed
data_vector
salary_stddev
