In [35]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import asc
from pyspark.sql.types import *
import os

### Augment data for benchmarking purposes
Provided data was too low in volume. Multiply existing data into millions of row to clearly see the significant difference between reading,processing data using Spark.

In [9]:
df = pd.read_csv("../data/creditcard.csv")
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


In [34]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 284807 entries, 0 to 284806
Data columns (total 31 columns):
 #   Column  Non-Null Count   Dtype  
---  ------  --------------   -----  
 0   Time    284807 non-null  float64
 1   V1      284807 non-null  float64
 2   V2      284807 non-null  float64
 3   V3      284807 non-null  float64
 4   V4      284807 non-null  float64
 5   V5      284807 non-null  float64
 6   V6      284807 non-null  float64
 7   V7      284807 non-null  float64
 8   V8      284807 non-null  float64
 9   V9      284807 non-null  float64
 10  V10     284807 non-null  float64
 11  V11     284807 non-null  float64
 12  V12     284807 non-null  float64
 13  V13     284807 non-null  float64
 14  V14     284807 non-null  float64
 15  V15     284807 non-null  float64
 16  V16     284807 non-null  float64
 17  V17     284807 non-null  float64
 18  V18     284807 non-null  float64
 19  V19     284807 non-null  float64
 20  V20     284807 non-null  float64
 21  V21     28

Sampling the data into 20M Rows

In [10]:
# df_20M = df.sample(20000000, replace=True)
# df_20M.to_csv("../data/creditfraud_20M.csv", index=False)

### Spark Reading CSV
Build a session to read a csv file with Spark

In [11]:
%env KUBECONFIG=/home/agung/.kube/config

env: KUBECONFIG=/home/agung/.kube/config


In [21]:
%%time
# os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages com.amazonaws:aws-java-sdk-bundle:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"

spark = SparkSession.builder \
    .appName("SparkK8sRead") \
    .master("k8s://https://127.0.0.1:35069") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.executor.instances", 5) \
    .config("spark.kubernetes.container.image", "spark:kube-spark-hadoop-aws") \
    .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent") \
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
    .getOrCreate()

# sc = spark.sparkContext
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "nightingale")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "nightingale")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
# spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
# spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://10.44.101.211:39000/")

                                                                                

+--------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------+-----+
|    Time|                 V1|                 V2|                 V3|                V4|                 V5|                V6|                 V7|                 V8|                 V9|                V10|               V11|                V12|                V13|               V14|                V15|                V16|               V17|                V18|                V19|                V20|                V

In [42]:
schema = StructType([
    StructField("Time", FloatType(), True),
    StructField("V1", FloatType(), True),
    StructField("V2", FloatType(), True),
    StructField("V3", FloatType(), True),
    StructField("V4", FloatType(), True),
    StructField("V5", FloatType(), True),
    StructField("V6", FloatType(), True),
    StructField("V7", FloatType(), True),
    StructField("V8", FloatType(), True),
    StructField("V9", FloatType(), True),
    StructField("V10", FloatType(), True),
    StructField("V11", FloatType(), True),
    StructField("V12", FloatType(), True),
    StructField("V13", FloatType(), True),
    StructField("V14", FloatType(), True),
    StructField("V15", FloatType(), True),
    StructField("V16", FloatType(), True),
    StructField("V17", FloatType(), True),
    StructField("V18", FloatType(), True),
    StructField("V19", FloatType(), True),
    StructField("V20", FloatType(), True),
    StructField("V21", FloatType(), True),
    StructField("V22", FloatType(), True),
    StructField("V23", FloatType(), True),
    StructField("V24", FloatType(), True),
    StructField("V25", FloatType(), True),
    StructField("V26", FloatType(), True),
    StructField("V27", FloatType(), True),
    StructField("V28", FloatType(), True),
    StructField("Amount", FloatType(), True),
    StructField("Class", FloatType(), True)
    # Add other fields as necessary
])

df_spark_20m = spark.read.csv("s3a://sparkbucket/data/creditfraud_20M.csv", header=True, schema=schema)
df_spark_20m.show()

+--------+-----------+------------+-----------+-----------+------------+-----------+------------+-------------+------------+------------+-----------+------------+-------------+-----------+------------+------------+-----------+-----------+-----------+------------+------------+-----------+------------+------------+-----------+-----------+------------+------------+------+-----+
|    Time|         V1|          V2|         V3|         V4|          V5|         V6|          V7|           V8|          V9|         V10|        V11|         V12|          V13|        V14|         V15|         V16|        V17|        V18|        V19|         V20|         V21|        V22|         V23|         V24|        V25|        V26|         V27|         V28|Amount|Class|
+--------+-----------+------------+-----------+-----------+------------+-----------+------------+-------------+------------+------------+-----------+------------+-------------+-----------+------------+------------+-----------+-----------+------

In [43]:
df_spark_20m.select(df_spark_20m.columns[:5]).show(10)

+--------+-----------+-----------+-----------+-----------+
|    Time|         V1|         V2|         V3|         V4|
+--------+-----------+-----------+-----------+-----------+
|108921.0|-0.09400211|  0.8522104|  0.7446579|-0.13902408|
|  7593.0|-0.46024975|  1.0958843|  1.9548776|  0.5171335|
|113906.0|0.091538526|  0.7075897|-0.28127915| -1.0239733|
| 86140.0| -0.7229428| 0.37467352|  1.8896025|  1.2835553|
| 43446.0| -1.2445023|  0.5746821|   1.645834|-0.84282595|
|153702.0|  1.9158125| -1.0303335| -1.1785386|-0.44622123|
|136247.0|  -1.991231| 0.48790032|-0.08568214| -1.9935523|
|109465.0|-0.30396682|  1.2504085|  1.6099333|   3.530058|
|128410.0|  1.8411667| -0.8680402| -0.3161369| 0.51958805|
|127004.0|  1.9031944|-0.11770485| -2.8124654| 0.32127368|
+--------+-----------+-----------+-----------+-----------+
only showing top 10 rows



In [44]:
# Get the number of rows
num_rows = df_spark_20m.count()

# Get the number of columns
num_columns = len(df_spark_20m.columns)

# Print the dimensions
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")



Number of rows: 20000000
Number of columns: 31


                                                                                

In [45]:
%%time
df_spark_20m.orderBy(asc("V1")).select(df_spark_20m.columns[:6]).show(n=20)



+-------+---------+---------+----------+---------+---------+
|   Time|       V1|       V2|        V3|       V4|       V5|
+-------+---------+---------+----------+---------+---------+
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.71573|-6.6052647|16.491217|34.801666|
|39954.0|-56.40751|-72.7

                                                                                

In [46]:
df_spark_20m.printSchema()

root
 |-- Time: float (nullable = true)
 |-- V1: float (nullable = true)
 |-- V2: float (nullable = true)
 |-- V3: float (nullable = true)
 |-- V4: float (nullable = true)
 |-- V5: float (nullable = true)
 |-- V6: float (nullable = true)
 |-- V7: float (nullable = true)
 |-- V8: float (nullable = true)
 |-- V9: float (nullable = true)
 |-- V10: float (nullable = true)
 |-- V11: float (nullable = true)
 |-- V12: float (nullable = true)
 |-- V13: float (nullable = true)
 |-- V14: float (nullable = true)
 |-- V15: float (nullable = true)
 |-- V16: float (nullable = true)
 |-- V17: float (nullable = true)
 |-- V18: float (nullable = true)
 |-- V19: float (nullable = true)
 |-- V20: float (nullable = true)
 |-- V21: float (nullable = true)
 |-- V22: float (nullable = true)
 |-- V23: float (nullable = true)
 |-- V24: float (nullable = true)
 |-- V25: float (nullable = true)
 |-- V26: float (nullable = true)
 |-- V27: float (nullable = true)
 |-- V28: float (nullable = true)
 |-- Amount: floa

In [19]:
spark.stop()

24/06/25 08:20:30 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
