# 1- Data Ingestion
### Includes :
<ul>
    <li>Metadata Extraction</li>
    <li>Checking Nulls</li>
    <li>Checking Duplication</li>
</ul>

# 2- Transformation
<ul>
    <li>Drop Nulls</li>
    <li>Sort with (Value_Eur --- Desc)</li>
    <li>Filter (Age <=25)</li>
</ul>

# 3- Data Loading
<ul>
    <li>Load Data To Oracle DB</li>
</ul>

In [2]:
%%time
import pyspark
from pyspark import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, isnull, when ,col
from pyspark.sql.types import *
import os
import sys
spark_path = r"C:\Spark\spark-3.4.0"
os.environ['SPARK_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")

# Create a SparkSession with custom configurations

print('------> Starting Spark Engine....')
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.log.level", "WARN") \
    .config("spark.sql.shuffle.partitions", 1000) \
    .getOrCreate()

print('Done')
print(' ')

# Read the data into a DataFrame
print('------> Starting Ingestion....')
data = spark.read.csv(r'C:\Users\hp\PYSPARK-ASSESSMENT\File -- Oracle DB\male_players.csv', header=True, inferSchema=True).cache()
print('Done')
print(' ')


# Write the schema string to a text file
print('------> Exporting schema....')
schema_string = data._jdf.schema().treeString()
with open("metadata/schema.txt", "w") as f:
    f.write(str(schema_string).replace("(nullable = true)",""))
print('Done.')
print(' ')

# Check for null values
print('------> Checking nulls Start....')
null_counts = data.select([count(when(isnull(c), c)).alias(c) for c in data.columns]).toPandas()
null_counts.index = ['null_count']
print('Done')
print(' ')


# Check for duplicates
print('------> Checking duplicates Start....')
duplicate_counts = data.groupBy(data.columns).count().filter('count > 1').toPandas() 
print('Done')
print(' ')


# Save data information and schema to an Excel file
print('------> Exporting data info....')
with pd.ExcelWriter("metadata/data_info.xlsx") as writer:
    data.describe().toPandas().to_excel(writer, sheet_name="Data Info", index=False)
    null_counts.to_excel(writer, sheet_name="Null Counts", index=False)
    duplicate_counts.to_excel(writer, sheet_name="Duplicate Counts", index=False)
print('Done')
print(' ')



print("==========================================================================================================")
print('------> Starting Data Transformation....')
# Drop null values
data_dropped_nulls = data.dropna(how="any", subset=["value_eur"]).persist(StorageLevel.MEMORY_AND_DISK)

# Sort the DataFrame by "value_eur" in descending order
data_sorted = data_dropped_nulls.sort("value_eur", ascending=False).persist(StorageLevel.MEMORY_AND_DISK)

# Filter for ages greater than or equal to 25
data_filtered = data_sorted.filter(data_sorted['age'] <= 25).persist(StorageLevel.MEMORY_AND_DISK)
print('Done')
print(' ')

data2=data_filtered.withColumn("player_traits", data["player_traits"].cast("Long")).persist(StorageLevel.MEMORY_AND_DISK)



print("==========================================================================================================")
print('------> Starting Data Loading....')

dbtable = "FIFA_PLAYER_FILE_2"
user = "system"
password = "Welcome123"
# Change this to your Oracle's details accordingly
server = "localhost"
port = 1521
service_name = 'odi'
jdbcUrl = f"jdbc:oracle:thin:@{server}:{port}:{service_name}"
jdbcDriver = "oracle.jdbc.driver.OracleDriver"



# Create a data frame by writing data to Oracle via JDBC
data2.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbcDriver) \
    .mode('append').save()
print('Done')
print(' ')

print("To Check in Details Visit Spark-UI Below:")
spark

------> Starting Spark Engine....
Done
 
------> Starting Ingestion....
Done
 
------> Exporting schema....
Done.
 
------> Checking nulls Start....
Done
 
------> Checking duplicates Start....
Done
 
------> Exporting data info....
Done
 
------> Starting Data Transformation....
Done
 
------> Starting Data Loading....
Done
 
To Check in Details Visit Spark-UI Below:
CPU times: total: 2.19 s
Wall time: 27min 2s


In [1]:
# file1.py
import pyspark.sql.pandas

# file2.py

import pandas as pd

print(pd.__version__)


1.5.3


In [5]:
%%time
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, isnull, when ,col
from pyspark.sql.types import *
import os
import sys
spark_path = r"C:\Spark\spark-3.4.0"
os.environ['SPARK_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")

# Create a SparkSession with custom configurations
print('------> Starting Spark Engine....')
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.log.level", "WARN") \
    .config("spark.jars", "file:///C:/spark/spark-3.4.0/jars/py4j-0.10.9.7.jar") \
    .getOrCreate()

print('Spark Engine is Ready.')
print(' ')



# Read the data into a DataFrame
print('------> Starting Ingestion....')
data = spark.read.csv(r'C:\Users\hp\PYSPARK-ASSESSMENT\File -- Oracle DB\male_players.csv', header=True, inferSchema=True)
print('Done')
print(' ')

# Change this to your Oracle's details accordingly

dbtable = "test"
user = "system"
password = "Welcome123456"
server = "localhost"
port = 1521
service_name = 'spark'
jdbcUrl = f"jdbc:oracle:thin:@{server}:{port}:{service_name}"
jdbcDriver = "oracle.jdbc.driver.OracleDriver"


data2=data.withColumn("player_traits", data["player_traits"].cast("Long"))
print('------> Starting Loading....')

# Create a data frame by writing data to Oracle via JDBC
data2.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbcDriver) \
    .mode('overwrite').save()
print('Done')
print(' ')

------> Starting Spark Engine....
Spark Engine is Ready.
 
------> Starting Ingestion....
Done
 
------> Starting Loading....
Done
 
CPU times: total: 78.1 ms
Wall time: 8min 33s


In [2]:
spark

In [6]:
spark.stop()

# 1-KryoSerializer

In [1]:
%%time
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, isnull, when ,col
from pyspark.sql.types import *
import os
import sys
spark_path = r"C:\Spark\spark-3.4.0"
os.environ['SPARK_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")

# Create a SparkSession with custom configurations
print('------> Starting Spark Engine....')
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.log.level", "WARN") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

print('Spark Engine is Ready.')
print(' ')



# Read the data into a DataFrame
print('------> Starting Ingestion....')
data = spark.read.csv(r'C:\Users\hp\PYSPARK-ASSESSMENT\File -- Oracle DB\male_players.csv', header=True, inferSchema=True)
print('Done')
print(' ')




# Change this to your Oracle's details accordingly

dbtable = "test1"
user = "system"
password = "Welcome123456"
server = "localhost"
port = 1521
service_name = 'spark'
jdbcUrl = f"jdbc:oracle:thin:@{server}:{port}:{service_name}"
jdbcDriver = "oracle.jdbc.driver.OracleDriver"


data2=data.withColumn("player_traits", data["player_traits"].cast("Long"))
print('------> Starting Loading....')

# Create a data frame by writing data to Oracle via JDBC
data2.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbcDriver) \
    .mode('overwrite').save()
print('Done')
print(' ')

------> Starting Spark Engine....
Spark Engine is Ready.
 
------> Starting Ingestion....
Done
 
------> Starting Loading....
Done
 
CPU times: total: 516 ms
Wall time: 8min 24s


In [2]:
spark.stop()

# 3-inMemoryColumnarStorage.batchSize = 100,000

In [1]:
%%time
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, isnull, when ,col
from pyspark.sql.types import *
import os
import sys
spark_path = r"C:\Spark\spark-3.4.0"
os.environ['SPARK_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")

# Create a SparkSession with custom configurations
print('------> Starting Spark Engine....')
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.log.level", "WARN") \
    .config("spark.jars", "file:///C:/spark/spark-3.4.0/jars/py4j-0.10.9.7.jar") \
    .config("spark.sql.inMemoryColumnarStorage.batchSize", "100000")\
    .getOrCreate()

print('Spark Engine is Ready.')
print(' ')



# Read the data into a DataFrame
print('------> Starting Ingestion....')
data = spark.read.csv(r'C:\Users\hp\PYSPARK-ASSESSMENT\File -- Oracle DB\male_players.csv', header=True, inferSchema=True)
print('Done')
print(' ')

# Change this to your Oracle's details accordingly

dbtable = "test"
user = "system"
password = "Welcome123456"
server = "localhost"
port = 1521
service_name = 'spark'
jdbcUrl = f"jdbc:oracle:thin:@{server}:{port}:{service_name}"
jdbcDriver = "oracle.jdbc.driver.OracleDriver"


data2=data.withColumn("player_traits", data["player_traits"].cast("Long"))
print('------> Starting Loading....')

# Create a data frame by writing data to Oracle via JDBC
data2.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbcDriver) \
    .mode('overwrite').save()
print('Done')
print(' ')

------> Starting Spark Engine....
Spark Engine is Ready.
 
------> Starting Ingestion....
Done
 
------> Starting Loading....
Done
 
CPU times: total: 359 ms
Wall time: 7min 52s


In [3]:
spark.stop()

# 4-inMemoryColumnarStorage.batchSize = 500,000

In [None]:
%%time
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, isnull, when ,col
from pyspark.sql.types import *
import os
import sys
spark_path = r"C:\Spark\spark-3.4.0"
os.environ['SPARK_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")

# Create a SparkSession with custom configurations
print('------> Starting Spark Engine....')
spark = SparkSession.builder \
    .appName("MySparkApp1") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.log.level", "WARN") \
    .config("spark.sql.inMemoryColumnarStorage.batchSize", "500000")\
    .config("spark.jars", "file:///C:/spark/spark-3.4.0/jars/py4j-0.10.9.7.jar") \
    .getOrCreate()

print('Spark Engine is Ready.')
print(' ')



# Read the data into a DataFrame
print('------> Starting Ingestion....')
data = spark.read.csv(r'C:\Users\hp\PYSPARK-ASSESSMENT\File -- Oracle DB\male_players.csv'
                      , header=True, inferSchema=True)
print('Done')
print(' ')

# Change this to your Oracle's details accordingly

dbtable = "test"
user = "system"
password = "Welcome123456"
server = "localhost"
port = 1521
service_name = 'spark'
jdbcUrl = f"jdbc:oracle:thin:@{server}:{port}:{service_name}"
jdbcDriver = "oracle.jdbc.driver.OracleDriver"


data2=data.withColumn("player_traits", data["player_traits"].cast("Long"))
print('------> Starting Loading....')

# Create a data frame by writing data to Oracle via JDBC
data2.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbcDriver) \
    .mode('overwrite').save()
print('Done')
print(' ')

------> Starting Spark Engine....
Spark Engine is Ready.
 
------> Starting Ingestion....


In [2]:
spark

In [2]:
# Get the number of partitions
num_partitions = data.rdd.getNumPartitions()

# Print the number of partitions
print("The number of partitions is:", num_partitions)

The number of partitions is: 42
