In [1]:
#Install all required library based on requirements.txt file
#add whatever library you wanted
%pip install -U -r D:/Repo/Spark-SQL/requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os

os.environ['SPARK_HOME'] = "C:/spark/spark-3.5.4-bin-hadoop3"
os.environ['JAVA_HOME'] = "C:/Program Files/Java/jdk-11"
os.environ['HADOOP_HOME'] = "C:/spark/spark-3.5.4-bin-hadoop3"

In [3]:
# spark session and launch default SPARK UI to monitor with default port 4040
# http://localhost:4040/jobs/
from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed

spark = SparkSession.builder \
    .master("local") \
        .appName("Spark-SQL") \
            .config("spark.executor.memory","4g") \
            .config("spark.sql.shuffle.partitions","6") \
            .getOrCreate()

In [4]:
def read_and_query(file_path, table_name):

    # Read the CSV file into a DataFrame
    df = spark.read.csv(file_path, header=True, inferSchema=True, sep="|") # inferschema is a function to automatically find datatype of source file itself

    # Register the DataFrame as a temporary view
    df.createOrReplaceTempView(table_name)
    
    # Use Spark SQL to query the DataFrame
    result = spark.sql(f"SELECT * FROM {table_name}")
    return result

In [5]:
import pandas as pd

path = "../File/Config.csv"

#function csv to tuples [(a,a),(b,b)] here using lambda x:x for x in x
def read_csv_as_tuples(path):
    df = pd.read_csv(path, header=0, sep="|")
    list_of_tuples = [tuple(row) for row in df.values]
    return list_of_tuples

#function normal df for iteration using iterrows
def read_csv(path):
    df = pd.read_csv(path, header=0, sep="|")
    return df

In [6]:
# read from root path by using ../
files_and_tables = read_csv_as_tuples(path)

# Use ThreadPoolExecutor to run tasks in parallel
with ThreadPoolExecutor() as executor:
    futures = [executor.submit(read_and_query, file, table) for file, table in files_and_tables]
    
    # Wait for all tasks to complete
    for future in as_completed(futures):
        future.result()

In [7]:
result = spark.sql("""
                   SELECT A.Id AS Id_A, B.Id AS Id_B
                    FROM DQC A
                    LEFT JOIN DQC2 B ON A.Id = B.Id
                    """)
    
# Show the result|
result.show(n=50)

+----+----+
|Id_A|Id_B|
+----+----+
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
|   2|   2|
+----+----+
only showing top 50 rows



In [8]:
spark

In [9]:
# Read the CSV file into a DataFrame
df = spark.read.json("../File/Json/Sample.json", multiLine=True) # inferschema is a function to automatically find datatype of source file itself
df.show()

+---+-------+
| id|   name|
+---+-------+
|  1|   Agam|
|  2| Lorita|
|  3|   Safa|
|  4|Zakaria|
+---+-------+



In [10]:
parquet_file_path = "../File/Parquet"
df.write.mode("overwrite").parquet(parquet_file_path)

In [12]:
df = spark.read.parquet(parquet_file_path)
df.show()

+---+-------+
| id|   name|
+---+-------+
|  1|   Agam|
|  2| Lorita|
|  3|   Safa|
|  4|Zakaria|
+---+-------+

