In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("TestSpark")
    .master("local[*]")
    .config("spark.ui.showConsoleProgress", "false")
    .config("spark.hadoop.home.dir", "C:\\tmp")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")
print("Spark version:", spark.version)

Spark version: 4.0.1


In [9]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder.appName('AMEX_data_load').config('spark.driver.memory', '8g').config('spark.executor.memory', '8g').getOrCreate()

train_df = spark.read.csv(r'C:\potfolio\Default_Prediction\data\raw\train_data.csv', header=True, inferSchema=True)
labels_df = spark.read.csv(r'C:\potfolio\Default_Prediction\data\raw\train_labels.csv', header=True,inferSchema=True)

print("Train Data Count:", train_df.count())
print("Labels Count:", labels_df.count())

Train Data Count: 5531451
Labels Count: 458913


In [10]:
train_df.columns

['customer_ID',
 'S_2',
 'P_2',
 'D_39',
 'B_1',
 'B_2',
 'R_1',
 'S_3',
 'D_41',
 'B_3',
 'D_42',
 'D_43',
 'D_44',
 'B_4',
 'D_45',
 'B_5',
 'R_2',
 'D_46',
 'D_47',
 'D_48',
 'D_49',
 'B_6',
 'B_7',
 'B_8',
 'D_50',
 'D_51',
 'B_9',
 'R_3',
 'D_52',
 'P_3',
 'B_10',
 'D_53',
 'S_5',
 'B_11',
 'S_6',
 'D_54',
 'R_4',
 'S_7',
 'B_12',
 'S_8',
 'D_55',
 'D_56',
 'B_13',
 'R_5',
 'D_58',
 'S_9',
 'B_14',
 'D_59',
 'D_60',
 'D_61',
 'B_15',
 'S_11',
 'D_62',
 'D_63',
 'D_64',
 'D_65',
 'B_16',
 'B_17',
 'B_18',
 'B_19',
 'D_66',
 'B_20',
 'D_68',
 'S_12',
 'R_6',
 'S_13',
 'B_21',
 'D_69',
 'B_22',
 'D_70',
 'D_71',
 'D_72',
 'S_15',
 'B_23',
 'D_73',
 'P_4',
 'D_74',
 'D_75',
 'D_76',
 'B_24',
 'R_7',
 'D_77',
 'B_25',
 'B_26',
 'D_78',
 'D_79',
 'R_8',
 'R_9',
 'S_16',
 'D_80',
 'R_10',
 'R_11',
 'B_27',
 'D_81',
 'D_82',
 'S_17',
 'R_12',
 'B_28',
 'R_13',
 'D_83',
 'R_14',
 'R_15',
 'D_84',
 'R_16',
 'B_29',
 'B_30',
 'S_18',
 'D_86',
 'D_87',
 'R_17',
 'R_18',
 'D_88',
 'B_31',
 'S_

In [11]:
# Join train and labels
joined_df = train_df.join(labels_df, on="customer_ID", how="inner")

# Verify counts
print("Joined Count:", joined_df.count())
print("Unique customer_IDs:", joined_df.select("customer_ID").distinct().count())

# Check class distribution
joined_df.groupBy("target").count().show()


Joined Count: 5531451
Unique customer_IDs: 458913
+------+-------+
|target|  count|
+------+-------+
|     1|1377869|
|     0|4153582|
+------+-------+



In [12]:
joined_df.head()

Row(customer_ID='0012e41fe6caa3ba31b55b3de2030cbb77b01203aeb4a5c6677de5b80f15cebe', S_2=datetime.date(2017, 6, 16), P_2=0.5704483550131871, D_39=0.0047285796259678, B_1=0.0085244243929605, B_2=0.814151014671311, R_1=0.5033651830116288, S_3=None, D_41=0.0006317155827193122, B_3=0.0062672870484008, D_42=0.0192309187742173, D_43=None, D_44=0.127592250277628, B_4=0.1089825660728197, D_45=0.0082944461124573, B_5=0.0095884317110505, R_2=0.0040985204991741, D_46=None, D_47=0.0731927146816847, D_48=0.7752648165334668, D_49=None, B_6=0.0273734554301517, B_7=0.0783072068671461, B_8=1.0084540798421495, D_50=None, D_51=0.0073348816495073, B_9=0.0025867509495788, R_3=0.3052592531420031, D_52=0.075250237798102, P_3=None, B_10=0.0881813685914911, D_53=None, S_5=0.0049186595525309, B_11=0.0029149879470895, S_6=1.005202292684667, D_54=1.0023139770314389, R_4=0.0069732787335638, S_7=None, B_12=0.014914036389869, S_8=0.0070373475286629, D_55=0.127426628828362, D_56=None, B_13=0.0010602971859736, R_5=0.00

In [13]:
joined_df.show()

+--------------------+----------+------------------+------------------+--------------------+------------------+--------------------+------------------+--------------------+------------------+------------------+------------------+--------------------+------------------+------------------+--------------------+--------------------+------------------+------------------+------------------+----+------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+----+--------------------+-------------------+-------------------+------------------+--------------------+------------------+------------------+--------------------+------------------+------------------+------------------+--------------------+------------------+----+------------------+------------------+------------------+------------------+--------------------+------------------+------------------+----+----+

In [14]:
joined_df['D_49'].value_counts

Column<'D_49['value_counts']'>

In [15]:
from pyspark.sql import functions as f 

D_nunique = joined_df.select(f.count_distinct('D_49')).show()

print(D_nunique)

+--------------------+
|count(DISTINCT D_49)|
+--------------------+
|              545534|
+--------------------+

None


In [20]:
joined_df.groupBy("D_49").count().orderBy(f.desc("count")).show(40)

+------------------+-------+
|              D_49|  count|
+------------------+-------+
|              NULL|4985917|
|0.4091256986822271|      1|
|0.0709522768142925|      1|
|0.0724681649547102|      1|
|0.1127581722333256|      1|
|0.4939978824775634|      1|
|0.0370068103069186|      1|
|0.0456079100663258|      1|
|0.1434514887467964|      1|
|0.1592169491698726|      1|
|0.0151114334644457|      1|
|0.4851146258604026|      1|
|0.1412881589764338|      1|
|0.1186413054949995|      1|
|0.7282354557857998|      1|
|0.3462704841744662|      1|
|0.2120838956738395|      1|
| 0.201862507379318|      1|
|0.0508926418374306|      1|
|0.0360607029318904|      1|
|0.3303165295148988|      1|
|0.0645507772036512|      1|
|0.8122659496236043|      1|
|0.1976927040649339|      1|
|  0.07716784838347|      1|
|0.0156042280128295|      1|
|0.1845235115220355|      1|
|0.7133847653929757|      1|
|0.2616692993302802|      1|
|0.1065015487935368|      1|
|0.0199251663514632|      1|
|0.05059879931

In [33]:
filtera = f.col('D_49').isNull()
final = joined_df.filter(filtera)
missing_pct = ( final.count() / joined_df.count()) * 100 
print(f'Missing Percentage in D_49 : {missing_pct:.2f}%')

Missing Percentage in D_49 : 90.14%


In [35]:
def missing_percent(column) :
    filtera = f.col(column).isNull()
    final = joined_df.filter(filtera)
    missing_pct = ( final.count() / joined_df.count()) * 100 
    return f'Missing Percentage in {column} : {missing_pct:.2f}%'

In [37]:
result = [ missing_percent(c) for c in joined_df.columns]

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\Asus\.conda\envs\gpuvenv\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Asus\.conda\envs\gpuvenv\Lib\site-packages\py4j\clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Asus\.conda\envs\gpuvenv\Lib\socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
ERROR:py4j.clientserver:Exception occurred while shutting down connection
Traceback (most recent call last):
  File "c:\Users\Asus\.conda\envs\gpuvenv\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\

KeyboardInterrupt: 