#Pyspark On Collab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
#for MaxRows
#spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', True)
#To Enable the Spark Rows Show
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
#spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

#Libraries

In [None]:
from pyspark.sql import functions as F
from functools import reduce
from operator import add
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
import plotly.express as px
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

#CSV

We will take the source columns and target columns from here

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SESSION").getOrCreate()
data_path = "/content/DATA.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)
df.show()

+---+---------+----------------+
| ID|SourceCol|       TargetCol|
+---+---------+----------------+
|  1|     Name|    CustomerName|
|  2| PostCode|CustomerPostCode|
|  3|      DOB|     CustomerDOB|
|  4|      AGE|     CustomerAge|
+---+---------+----------------+



Source Data

In [None]:
data_path = "/content/source.csv"
sourceTable = spark.read.csv(data_path, header=True, inferSchema=True)
sourceTable.show()

+-------+--------+------+----------+----+
|   Name|PostCode|noneed|       DOB| AGE|
+-------+--------+------+----------+----+
|   John|    6227|  8622|2004-06-30|19.0|
|  Alice|    1251|  7236|1980-08-27|43.0|
|    Bob|    8655|  2645|1996-02-04|27.0|
|Charlie|    8646|  4924|1995-05-12|28.0|
|  David|    5689|  9089|1958-05-16|65.0|
|   Emma|    4708|  7758|1962-09-02|61.0|
|  Frank|    8416|  4616|1976-11-25|47.0|
|  Grace|    5812|  9282|1973-09-27|50.0|
|   Hank|    5009|  3375|1963-01-24|60.0|
|    Ivy|    1436|  1734|1972-06-20|51.0|
|   Jack|    7507|  6894|1988-02-21|35.0|
|  Katie|    8852|  2323|1997-11-24|26.0|
|    Leo|    4244|  3231|1988-04-11|35.0|
|    Mia|    9867|  9895|1967-04-29|56.0|
| Nathan|    4713|  4725|1994-05-12|29.0|
| Olivia|    2152|  5005|1993-04-15|30.0|
|  Peter|    6097|  5286|2003-08-15|20.0|
|  Quinn|    5612|  3838|2002-06-17|21.0|
| Rachel|    2759|  7767|1974-09-26|49.0|
|    Sam|    5294|  4927|1988-01-13|35.0|
+-------+--------+------+---------

Target Data

#Data Fetching

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [None]:
spark = SparkSession.builder.appName("sessionII").getOrCreate()
#SOURCE TABLE
print("Source Table:")
sourceTable.show()
#MAPPING DATA FRAME
print("\nMapping DataFrame:")
df.show()
#QUERRY
targetTable = sourceTable.select([col(sc).alias(tc) for sc, tc in zip(df.select("SourceCol").rdd.flatMap(lambda x: x).collect(),
                                                                       df.select("TargetCol").rdd.flatMap(lambda x: x).collect())])

Source Table:
+-------+--------+------+----------+----+
|   Name|PostCode|noneed|       DOB| AGE|
+-------+--------+------+----------+----+
|   John|    6227|  8622|2004-06-30|19.0|
|  Alice|    1251|  7236|1980-08-27|43.0|
|    Bob|    8655|  2645|1996-02-04|27.0|
|Charlie|    8646|  4924|1995-05-12|28.0|
|  David|    5689|  9089|1958-05-16|65.0|
|   Emma|    4708|  7758|1962-09-02|61.0|
|  Frank|    8416|  4616|1976-11-25|47.0|
|  Grace|    5812|  9282|1973-09-27|50.0|
|   Hank|    5009|  3375|1963-01-24|60.0|
|    Ivy|    1436|  1734|1972-06-20|51.0|
|   Jack|    7507|  6894|1988-02-21|35.0|
|  Katie|    8852|  2323|1997-11-24|26.0|
|    Leo|    4244|  3231|1988-04-11|35.0|
|    Mia|    9867|  9895|1967-04-29|56.0|
| Nathan|    4713|  4725|1994-05-12|29.0|
| Olivia|    2152|  5005|1993-04-15|30.0|
|  Peter|    6097|  5286|2003-08-15|20.0|
|  Quinn|    5612|  3838|2002-06-17|21.0|
| Rachel|    2759|  7767|1974-09-26|49.0|
|    Sam|    5294|  4927|1988-01-13|35.0|
+-------+--------+--

In [None]:
targetTable.show()

+------------+----------------+-----------+-----------+
|CustomerName|CustomerPostCode|CustomerDOB|CustomerAge|
+------------+----------------+-----------+-----------+
|        John|            6227| 2004-06-30|       19.0|
|       Alice|            1251| 1980-08-27|       43.0|
|         Bob|            8655| 1996-02-04|       27.0|
|     Charlie|            8646| 1995-05-12|       28.0|
|       David|            5689| 1958-05-16|       65.0|
|        Emma|            4708| 1962-09-02|       61.0|
|       Frank|            8416| 1976-11-25|       47.0|
|       Grace|            5812| 1973-09-27|       50.0|
|        Hank|            5009| 1963-01-24|       60.0|
|         Ivy|            1436| 1972-06-20|       51.0|
|        Jack|            7507| 1988-02-21|       35.0|
|       Katie|            8852| 1997-11-24|       26.0|
|         Leo|            4244| 1988-04-11|       35.0|
|         Mia|            9867| 1967-04-29|       56.0|
|      Nathan|            4713| 1994-05-12|     

In [None]:
from pyspark.sql import functions as F

In [None]:
#Input
print("Available functions:")
print("1. Check for Null Values")
print("2. Summary Statistics")
print("3. Unique Values Count")
print("4. Value Counts for Each Column")

functionSelected = input("Enter the number corresponding to the function you want to apply (1, 2, 3, 4, etc.): ")
#APPLYING
result_df = None
if functionSelected == "1":
    # NULL VALUES
    null_counts = [F.sum(F.col(col).isNull().cast("int")).alias(f"{col}_null_count") for col in targetTable.columns]
    result_df = targetTable.agg(*null_counts)
elif functionSelected == "2":
    # SUMMARY STATS
    summary_stats = [F.mean(col).alias(f"{col}_mean") for col in targetTable.columns] + \
                    [F.stddev(col).alias(f"{col}_stddev") for col in targetTable.columns] + \
                    [F.min(col).alias(f"{col}_min") for col in targetTable.columns] + \
                    [F.max(col).alias(f"{col}_max") for col in targetTable.columns] + \
                    [F.percentile_approx(col, 0.25).alias(f"{col}_q1") for col in targetTable.columns] + \
                    [F.expr(f"percentile_approx({col}, 0.5)").alias(f"{col}_median") for col in targetTable.columns] + \
                    [F.percentile_approx(col, 0.75).alias(f"{col}_q3") for col in targetTable.columns]
    result_df = targetTable.agg(*summary_stats)
elif functionSelected == "3":
    # UNIQUE VAL COUNT
    unique_counts = [F.countDistinct(col).alias(f"{col}_unique_count") for col in targetTable.columns]
    result_df = targetTable.agg(*unique_counts)
elif functionSelected == "4":
    # ALL VAL COUNT
    value_counts = [F.count(F.col(col)).alias(f"{col}_count") for col in targetTable.columns]
    result_df = targetTable.agg(*value_counts)

#RESULTS
result_df.show()


Available functions:
1. Check for Null Values
2. Summary Statistics
3. Unique Values Count
4. Value Counts for Each Column


In [None]:
from pyspark.sql import functions as F

In [None]:
columnsSelected = input("Enter the columns you want to select (comma-separated): ")
columnsSelected = [col.strip() for col in columnsSelected.split(',')]
#Validation Of Columns
invalidColumns = set(columnsSelected) - set(targetTable.columns)
if invalidColumns:
    print(f"Invalid columns: {', '.join(invalidColumns)}")
    exit()
#Querry To Select Columns
selectedTable = targetTable.select(*columnsSelected)
#Final Tables with columns
print(f"Tables with selected columns: {', '.join(columnsSelected)}")
selectedTable.show()

# Input for specific function
print("Available functions:")
print("1. Sum")
print("2. Count")
print("3. Subtract")
print("4. Maximum")
print("5. Minimum")
print("6. Mode")

functionSelected = input("Enter the number corresponding to the function you want to apply (1, 2, 3, etc.): ")

# Applying
result_df = None
if functionSelected == "1":
    result_df = selectedTable.agg(*(F.sum(col).alias(col) for col in columnsSelected))
elif functionSelected == "2":
    result_df = selectedTable.agg(*(F.count(col).alias(col) for col in columnsSelected))
elif functionSelected == "3":
    if len(columnsSelected) == 2:
        result_df = selectedTable.select(columnsSelected[0], (selectedTable[columnsSelected[0]] - selectedTable[columnsSelected[1]]).alias("result"))
    else:
        print("Subtract function requires exactly two columns.")
        exit()
elif functionSelected == "4":
    result_df = selectedTable.agg(*(F.max(col).alias(col) for col in columnsSelected))
elif functionSelected == "5":
    result_df = selectedTable.agg(*(F.min(col).alias(col) for col in columnsSelected))
elif functionSelected == "6":
    # Calculate mode by counting occurrences and selecting the top one
    mode_exprs = [F.expr(f"first({col}, true)").alias(col) for col in columnsSelected]
    result_df = selectedTable.groupBy().agg(*mode_exprs)


#Resiult
if result_df:
    print(f"Result after applying the selected function:")
    result_df.show()

Enter the columns you want to select (comma-separated): CustomerAge
Tables with selected columns: CustomerAge
+-----------+
|CustomerAge|
+-----------+
|       19.0|
|       43.0|
|       27.0|
|       28.0|
|       65.0|
|       61.0|
|       47.0|
|       50.0|
|       60.0|
|       51.0|
|       35.0|
|       26.0|
|       35.0|
|       56.0|
|       29.0|
|       30.0|
|       20.0|
|       21.0|
|       49.0|
|       35.0|
+-----------+

Available functions:
1. Sum
2. Count
3. Subtract
4. Maximum
5. Minimum
6. Mode
Enter the number corresponding to the function you want to apply (1, 2, 3, etc.): 5
Result after applying the selected function:
+-----------+
|CustomerAge|
+-----------+
|       19.0|
+-----------+

