Merge 2 dataframes with uneven columns

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Merge').getOrCreate()

In [20]:
from pyspark.sql.functions import lit

input1DF = spark.read.format('csv').option('header',True).load('sample_data\input1.csv')
input1DF.show()
input2DF = spark.read.format('csv').option('header',True).load('sample_data\input2.csv')
input2DF.show()

input1AddDF = input1DF.withColumn('Gender',lit(None))
input1AddDF.show()

result = input1AddDF.union(input2DF)
result.show()

+--------+---+
|    Name|Age|
+--------+---+
| Monisha| 23|
|  Arvind| 24|
|Rishitha| 24|
|  Anusha| 24|
| Gayatri| 25|
+--------+---+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
| Monisha| 23|     F|
|  Arvind| 24|     M|
|Rishitha| 24|     F|
+--------+---+------+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
| Monisha| 23|  null|
|  Arvind| 24|  null|
|Rishitha| 24|  null|
|  Anusha| 24|  null|
| Gayatri| 25|  null|
+--------+---+------+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
| Monisha| 23|  null|
|  Arvind| 24|  null|
|Rishitha| 24|  null|
|  Anusha| 24|  null|
| Gayatri| 25|  null|
| Monisha| 23|     F|
|  Arvind| 24|     M|
|Rishitha| 24|     F|
+--------+---+------+



Another method

In [21]:
from pyspark.sql.types import *

schema = StructType(
    [
    StructField('Name',StringType(),True), # 3rd option is nullable is true or not
    StructField('Age',IntegerType(),True),
    StructField('Gender',StringType(),True),
    ]
)

input1DF = spark.read.format('csv').option('header',True).schema(schema).load('sample_data\input1.csv')
input1DF.show()
input2DF = spark.read.option('header',True).csv('sample_data\input2.csv',schema=schema)
input2DF.show()

result = input1DF.union(input2DF)
result.show()



+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
| Monisha| 23|  null|
|  Arvind| 24|  null|
|Rishitha| 24|  null|
|  Anusha| 24|  null|
| Gayatri| 25|  null|
+--------+---+------+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
| Monisha| 23|     F|
|  Arvind| 24|     M|
|Rishitha| 24|     F|
+--------+---+------+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
| Monisha| 23|  null|
|  Arvind| 24|  null|
|Rishitha| 24|  null|
|  Anusha| 24|  null|
| Gayatri| 25|  null|
| Monisha| 23|     F|
|  Arvind| 24|     M|
|Rishitha| 24|     F|
+--------+---+------+



Another method

In [24]:
input1DF = spark.read.format('csv').option('header',True).load('sample_data\input1.csv')
input1DF.show()
input2DF = spark.read.format('csv').option('header',True).load('sample_data\input2.csv')
input2DF.show()

result = input1DF.join(input2DF,on=['Name','Age'],how='outer')
result.show()

result = input1DF.join(input2DF,[input1DF.Name==input2DF.Name,input1DF.Age==input2DF.Age],how='outer').select(input1DF.Name,input1DF.Age,input2DF.Gender)
result.show()


+--------+---+
|    Name|Age|
+--------+---+
| Monisha| 23|
|  Arvind| 24|
|Rishitha| 24|
|  Anusha| 24|
| Gayatri| 25|
+--------+---+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
| Monisha| 23|     F|
|  Arvind| 24|     M|
|Rishitha| 24|     F|
+--------+---+------+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Anusha| 24|  null|
|  Arvind| 24|     M|
| Gayatri| 25|  null|
| Monisha| 23|     F|
|Rishitha| 24|     F|
+--------+---+------+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Anusha| 24|  null|
|  Arvind| 24|     M|
| Gayatri| 25|  null|
| Monisha| 23|     F|
|Rishitha| 24|     F|
+--------+---+------+



Best approach - Automated

In [25]:
input1DF = spark.read.format('csv').option('header',True).load('sample_data\input1.csv')
input1DF.show()
input2DF = spark.read.format('csv').option('header',True).load('sample_data\input2.csv')
input2DF.show()

listA = set(input1DF.columns)-set(input2DF.columns)
listB = set(input2DF.columns)-set(input1DF.columns)

for i in listA:
    input2DF = input2DF.withColumn(i,lit(None))

for i in listB:
    input1DF = input1DF.withColumn(i,lit(None))
    
resut = input1DF.union(input2DF)

result.show()

+--------+---+
|    Name|Age|
+--------+---+
| Monisha| 23|
|  Arvind| 24|
|Rishitha| 24|
|  Anusha| 24|
| Gayatri| 25|
+--------+---+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
| Monisha| 23|     F|
|  Arvind| 24|     M|
|Rishitha| 24|     F|
+--------+---+------+

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Anusha| 24|  null|
|  Arvind| 24|     M|
| Gayatri| 25|  null|
| Monisha| 23|     F|
|Rishitha| 24|     F|
+--------+---+------+



Apply line break every 5th occurance from | delimited input.txt

In [49]:
from pyspark.sql.functions import regexp_replace,explode,split

input = spark.read.csv('sample_data\input.txt')

input.show(truncate=False)

input = input.withColumn("chk",regexp_replace("_c0","(.*?\\|){4}","$0-"))

input.show(truncate=False)

input = input.withColumn('col_explode',explode(split('chk','\|-')))

input.select(input.col_explode).show()

result = input.select(input.col_explode)

result.show()




+--------------------------------------------------------------------------------------------------------+
|_c0                                                                                                     |
+--------------------------------------------------------------------------------------------------------+
|A|BE|1|25|B|BSC|2|27|A|BE|1|25|B|BSC|2|27|A|BE|1|25|B|BSC|2|27|A|BE|1|25|B|BSC|2|27|A|BE|1|25|B|BSC|2|27|
+--------------------------------------------------------------------------------------------------------+

+--------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+
|_c0                                                                                                     |chk                                                                                                              |
+---------------------

In [55]:
result.select(split('col_explode','\|')).show()

result.rdd.map(lambda i:i).collect()




+--------------------------+
|split(col_explode, \|, -1)|
+--------------------------+
|            [A, BE, 1, 25]|
|           [B, BSC, 2, 27]|
|            [A, BE, 1, 25]|
|           [B, BSC, 2, 27]|
|            [A, BE, 1, 25]|
|           [B, BSC, 2, 27]|
|            [A, BE, 1, 25]|
|           [B, BSC, 2, 27]|
|            [A, BE, 1, 25]|
|           [B, BSC, 2, 27]|
+--------------------------+



[Row(col_explode='A|BE|1|25'),
 Row(col_explode='B|BSC|2|27'),
 Row(col_explode='A|BE|1|25'),
 Row(col_explode='B|BSC|2|27'),
 Row(col_explode='A|BE|1|25'),
 Row(col_explode='B|BSC|2|27'),
 Row(col_explode='A|BE|1|25'),
 Row(col_explode='B|BSC|2|27'),
 Row(col_explode='A|BE|1|25'),
 Row(col_explode='B|BSC|2|27')]

In [None]:
result.rdd.map(lambda i:len(i)).collect()


In [None]:
result_rdd = result.rdd.map(lambda i:i[0].split('|'))

result = result_rdd.toDF(['Name','Qualification','S.no','Age']).show()

Read files recursively under Folders

In [17]:
from pyspark.sql.functions import input_file_name

df1 = spark.read.format('csv').option('header',True).load('sample_data/recursive').withColumn('filepath',input_file_name())

df1.show(truncate=False)

paths = [
    'sample_data/recursive',
    'sample_data/recursive/level1'
]

df1 = spark.read.format('csv').option('header',True).load(paths).withColumn('filepath',input_file_name())

df1.show(truncate=False)

df1 = spark.read.format('csv').option('header',True).option('recursiveFileLookup',True).load(paths).withColumn('filepath',input_file_name())

df1.show(truncate=False)

+--------+---+--------------------------------------------------------------+
|Name    |Age|filepath                                                      |
+--------+---+--------------------------------------------------------------+
|Monisha |23 |file:///c:/Users/varsh/pyspark/sample_data/recursive/file1.csv|
|Arvind  |24 |file:///c:/Users/varsh/pyspark/sample_data/recursive/file1.csv|
|Rishitha|24 |file:///c:/Users/varsh/pyspark/sample_data/recursive/file1.csv|
|Anusha  |24 |file:///c:/Users/varsh/pyspark/sample_data/recursive/file1.csv|
|Gayatri |25 |file:///c:/Users/varsh/pyspark/sample_data/recursive/file1.csv|
+--------+---+--------------------------------------------------------------+

+--------+---+---------------------------------------------------------------------+
|Name    |Age|filepath                                                             |
+--------+---+---------------------------------------------------------------------+
|Monisha |23 |file:///c:/Users/varsh/pyspa

Which one returns results faster ?

In [None]:
#1
df = spark.read.csv()
df = df.filter()
df = df.sort()
df.count()
#2
df = spark.read.csv()
df = df.sort()
df = df.filter()
df.count()
#3
df = spark.read.csv()
df.sort()
df = df.filter()
df = df.sort()
df.count()
#4
df = spark.read.csv()
df.sort()
df = df.sort()
df = df.filter()
df.count()

# Predicate pushdown pushes the filter to the start even if sort is given first

# option A performs better, Caching took time, this is not the right case for doing caching

Inferschema Vs Schema definition

Always define schema manually since inferschema scans the table twice

Data skew

In [19]:
bank = spark.read.format('csv').option('header',True).option('inferSchema',True).load('sample_data/BankChurners.csv')

bank.show()

bank.rdd.getNumPartitions()

+---------+-----------------+------------+------+---------------+---------------+--------------+---------------+-------------+--------------+------------------------+----------------------+---------------------+------------+-------------------+---------------+--------------------+---------------+--------------+-------------------+---------------------+
|CLIENTNUM|   Attrition_Flag|Customer_Age|Gender|Dependent_count|Education_Level|Marital_Status|Income_Category|Card_Category|Months_on_book|Total_Relationship_Count|Months_Inactive_12_mon|Contacts_Count_12_mon|Credit_Limit|Total_Revolving_Bal|Avg_Open_To_Buy|Total_Amt_Chng_Q4_Q1|Total_Trans_Amt|Total_Trans_Ct|Total_Ct_Chng_Q4_Q1|Avg_Utilization_Ratio|
+---------+-----------------+------------+------+---------------+---------------+--------------+---------------+-------------+--------------+------------------------+----------------------+---------------------+------------+-------------------+---------------+--------------------+---------

1

In [21]:
input = bank.repartition(4)
input.rdd.getNumPartitions()

4

In [None]:
from pyspark.sql.functions import spark_partition_id

input = input.withColumn('partition_id',spark_partition_id())
input.show()

countByPartition = input.groupBy(input.partition_id).count()
countByPartition.show()

# Data is evenly distributed

+---------+-----------------+------------+------+---------------+---------------+--------------+---------------+-------------+--------------+------------------------+----------------------+---------------------+------------+-------------------+---------------+--------------------+---------------+--------------+-------------------+---------------------+------------+
|CLIENTNUM|   Attrition_Flag|Customer_Age|Gender|Dependent_count|Education_Level|Marital_Status|Income_Category|Card_Category|Months_on_book|Total_Relationship_Count|Months_Inactive_12_mon|Contacts_Count_12_mon|Credit_Limit|Total_Revolving_Bal|Avg_Open_To_Buy|Total_Amt_Chng_Q4_Q1|Total_Trans_Amt|Total_Trans_Ct|Total_Ct_Chng_Q4_Q1|Avg_Utilization_Ratio|partition_id|
+---------+-----------------+------------+------+---------------+---------------+--------------+---------------+-------------+--------------+------------------------+----------------------+---------------------+------------+-------------------+---------------+----

In [29]:
input = bank.repartition(200,"Card_Category")
input.rdd.getNumPartitions()

200

In [30]:
input = input.withColumn('partition_id',spark_partition_id())
input.show()

countByPartition = input.groupBy(input.partition_id).count()
countByPartition.show()

+---------+-----------------+------------+------+---------------+---------------+--------------+---------------+-------------+--------------+------------------------+----------------------+---------------------+------------+-------------------+---------------+--------------------+---------------+--------------+-------------------+---------------------+------------+
|CLIENTNUM|   Attrition_Flag|Customer_Age|Gender|Dependent_count|Education_Level|Marital_Status|Income_Category|Card_Category|Months_on_book|Total_Relationship_Count|Months_Inactive_12_mon|Contacts_Count_12_mon|Credit_Limit|Total_Revolving_Bal|Avg_Open_To_Buy|Total_Amt_Chng_Q4_Q1|Total_Trans_Amt|Total_Trans_Ct|Total_Ct_Chng_Q4_Q1|Avg_Utilization_Ratio|partition_id|
+---------+-----------------+------------+------+---------------+---------------+--------------+---------------+-------------+--------------+------------------------+----------------------+---------------------+------------+-------------------+---------------+----

Column methods


Method	Returns	Use Case
df.columns	List of strings	Quick column list
df.schema.names	List of strings	Same as above
df.dtypes	List of (name, type) tuples	View names + types
df.schema	StructType	Full schema object
df.printSchema()	Printed tree	Visual inspection
[f.name for f in df.schema.fields]	List of names	Custom logic over schema

Calculate expriy date 

In [34]:
exp_df = spark.read.format('csv').option('header',True).option('inferSchema',True).load('sample_data/expiry.csv')

exp_df.show()
print(exp_df.schema)

+---+------------+--------+
| id|rechargeDate|validity|
+---+------------+--------+
|  1|    20200511|      20|
|  2|    20200119|      13|
|  3|    20200405|     120|
+---+------------+--------+

StructType([StructField('id', IntegerType(), True), StructField('rechargeDate', IntegerType(), True), StructField('validity', IntegerType(), True)])


In [40]:
from pyspark.sql.functions import date_add,to_date,col,expr

exp_df = exp_df.withColumn('date',to_date(col('rechargeDate').cast('string'),"yyyyMMdd"))
exp_df.show()

exp_df = exp_df.withColumn('expiryDate',date_add('date','validity'))
exp_df.show()

+---+------------+--------+----------+
| id|rechargeDate|validity|      date|
+---+------------+--------+----------+
|  1|    20200511|      20|2020-05-11|
|  2|    20200119|      13|2020-01-19|
|  3|    20200405|     120|2020-04-05|
+---+------------+--------+----------+

+---+------------+--------+----------+----------+
| id|rechargeDate|validity|      date|expiryDate|
+---+------------+--------+----------+----------+
|  1|    20200511|      20|2020-05-11|2020-05-31|
|  2|    20200119|      13|2020-01-19|2020-02-01|
|  3|    20200405|     120|2020-04-05|2020-08-03|
+---+------------+--------+----------+----------+



Merge 2 complex dataframes 

In [51]:
json1 = spark.read.json('sample_data/input1.json',multiLine=True)
json2 = spark.read.json('sample_data/input2.json',multiLine=True)


json1.printSchema()
json1.show()
json1.columns

root
 |-- Education: struct (nullable = true)
 |    |-- Age: long (nullable = true)
 |    |-- Qualification: string (nullable = true)
 |    |-- Year: long (nullable = true)
 |-- Name: string (nullable = true)

+----------------+----+
|       Education|Name|
+----------------+----+
|{28, BCOM, 2013}|Azar|
|  {24, BE, 2021}|Amol|
+----------------+----+



['Education', 'Name']

In [64]:
json1.schema

StructType([StructField('Education', StructType([StructField('Age', LongType(), True), StructField('Qualification', StringType(), True), StructField('Year', LongType(), True)]), True), StructField('Name', StringType(), True)])

In [52]:
from pyspark.sql.functions import struct, lit, col
from pyspark.sql.types import *

def flatten_struct(schema, prefix=""):
    result = []
    for elem in schema:
        if isinstance(elem.dataType,StructType):
            result+=flatten_struct(elem.dataType, prefix+elem.name+".")
        else:
            result.append(col(prefix+elem.name).alias(prefix+elem.name))
    return result

l1 = flatten_struct(json1.schema)
l2 = flatten_struct(json2.schema)

In [74]:
col1 = []
col2 = []
for i in l1:
    col1.append(str(i).replace("AS","'").split("'")[1].strip())
for i in l2:
    col2.append(str(i).replace("AS","'").split("'")[1].strip())

In [97]:
print(col1)
print(col2)
diff = set(col1)-set(col2)
print(diff)

for i in diff:
    if('.' in i):
        c,cn = i.split(".")
        s_type = json1.schema[c].dataType[cn].dataType
        print(s_type)
        s_fields = json2.schema[c].dataType.names
        print(s_fields)
        inDf = json2.withColumn(c,struct(*([col(c)[record].alias(record) for record in s_fields] + [lit(None).cast(s_type).alias(cn)])))
        s_fields = sorted(inDf.schema[c].dataType.names)
        inDf.show()
        inDf = inDf.withColumn(c,struct(*([col(c)[record].alias(record) for record in s_fields])))
        inDf.show()

['Education.Age', 'Education.Qualification', 'Education.Year', 'Name']
['Education.Qualification', 'Education.Year', 'Name']
{'Education.Age'}
LongType()
['Qualification', 'Year']
+-----------------+------+
|        Education|  Name|
+-----------------+------+
|{BSC, 2012, null}|Benita|
|{MSC, 2022, null}| Bavya|
+-----------------+------+

+-----------------+------+
|        Education|  Name|
+-----------------+------+
|{null, BSC, 2012}|Benita|
|{null, MSC, 2022}| Bavya|
+-----------------+------+



In [98]:
inDf.printSchema()

root
 |-- Education: struct (nullable = false)
 |    |-- Age: long (nullable = true)
 |    |-- Qualification: string (nullable = true)
 |    |-- Year: long (nullable = true)
 |-- Name: string (nullable = true)



In [100]:
json1.printSchema()
outputDf = json1.union(inDf)
outputDf.show()

root
 |-- Education: struct (nullable = true)
 |    |-- Age: long (nullable = true)
 |    |-- Qualification: string (nullable = true)
 |    |-- Year: long (nullable = true)
 |-- Name: string (nullable = true)

+-----------------+------+
|        Education|  Name|
+-----------------+------+
| {28, BCOM, 2013}|  Azar|
|   {24, BE, 2021}|  Amol|
|{null, BSC, 2012}|Benita|
|{null, MSC, 2022}| Bavya|
+-----------------+------+



Speculative Execution in Spark

Speculative execution in Apache Spark is a performance optimization technique that aims to handle slow or straggling tasks during a job execution. In distributed systems like Spark, tasks are run in parallel across multiple workers. However, due to various reasons (e.g., resource contention, network latency, node failures), some tasks may take much longer to complete than others. These slow tasks are known as "stragglers."

Speculative execution helps mitigate the impact of these straggler tasks by launching duplicate copies of the slow tasks on other nodes. The first copy that finishes is considered the result, and the others are discarded. This helps reduce the overall job completion time.

In [None]:
spark.conf('spark.speculation',True) # Enables or disables speculative execution (default is false).
spark.conf('spark.speculation.interval',200) # The frequency at which Spark checks for straggler tasks.
spark.conf('spark.speculation.multiplier',5) # threshold = median_task_duration * spark.speculation.multiplier
# Where:

# median_task_duration is the time it takes for most tasks to finish in the same stage.

# spark.speculation.multiplier is the factor that defines how much longer a task can take before it is considered a straggler.

# If you have spark.speculation.multiplier = 1.5 and the median time for tasks in a stage is 10 seconds, the threshold for considering a task as a straggler would be:

# threshold = 10 * 1.5 = 15 seconds
# In this case, if any task takes longer than 15 seconds to complete, Spark will consider it a candidate for speculative execution and will launch a duplicate copy of the task on another executor

spark.conf('spark.speculation.quantile',0.75) # The quantile of tasks that are considered for speculative execution (e.g., if you set this to 0.75, Spark will speculate on the slowest 25% of tasks)

How to handle corrupt/bad records

Modes in spark.read()
1. PERMISSIVE
2. FAILFAST
3. DROPMALFORMED

In [102]:
df = spark.read.csv('sample_data/badRecords.csv',header=True,inferSchema=True)
df.show()

+-------+--------+----------+
| emp_no|emp_name|department|
+-------+--------+----------+
|      1| Murugan|        IT|
|invalid| invalid|      null|
|      2|  Kannan|   Finance|
|      3|   Mohan|      null|
|      4|   Pavan|        HR|
+-------+--------+----------+



In [103]:
df = spark.read.option('mode','FAILFAST').csv('sample_data/badRecords.csv',header=True,inferSchema=True)
df.show()

Py4JJavaError: An error occurred while calling o659.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 123.0 failed 1 times, most recent failure: Lost task 0.0 in stage 123.0 (TID 432) (localhost executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.RuntimeException: Malformed CSV record
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:330)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:275)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 20 more
Caused by: java.lang.RuntimeException: Malformed CSV record
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedCSVRecordError(QueryExecutionErrors.scala:1222)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:298)
	... 23 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.RuntimeException: Malformed CSV record
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:330)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:275)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 20 more
Caused by: java.lang.RuntimeException: Malformed CSV record
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedCSVRecordError(QueryExecutionErrors.scala:1222)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:298)
	... 23 more


In [None]:
df = spark.read.option('mode','DROPMALFORMED').csv('sample_data/badRecords.csv',header=True,inferSchema=True)
df.show()

+------+--------+----------+
|emp_no|emp_name|department|
+------+--------+----------+
|     1| Murugan|        IT|
|     2|  Kannan|   Finance|
|     4|   Pavan|        HR|
+------+--------+----------+

root
 |-- emp_no: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- department: string (nullable = true)



In [None]:
schema = StructType(
    [
        StructField('emp_no',IntegerType()),
        StructField('emp_name',StringType(),True),
        StructField('department',StringType(),True)
    ]
)
df = spark.read.option('mode','DROPMALFORMED').schema(schema=schema).csv('sample_data/badRecords.csv',header=True)
df.show()

+------+--------+----------+
|emp_no|emp_name|department|
+------+--------+----------+
|     1| Murugan|        IT|
|     2|  Kannan|   Finance|
|     3|   Mohan|      null|
|     4|   Pavan|        HR|
+------+--------+----------+



In [138]:
schema = StructType(
    [
        StructField('emp_no',IntegerType()),
        StructField('emp_name',StringType(),True),
        StructField('department',StringType(),True)
    ]
)
df = spark.read.schema(schema=schema).csv('sample_data/badRecords.csv',header=True)
df.show()

+------+--------+----------+
|emp_no|emp_name|department|
+------+--------+----------+
|     1| Murugan|        IT|
|  null| invalid|      null|
|     2|  Kannan|   Finance|
|     3|   Mohan|      null|
|     4|   Pavan|        HR|
+------+--------+----------+



How to save bad records from a json to a file

Option 1: Use mode("PERMISSIVE") and filter out null records to capture bad records.

Option 2: Define a custom schema and use exception handling to capture bad records.

Option 3: Use RDD transformations like map and filter to catch bad records and process them.

In [145]:
df.filter(df.emp_no.isNull()).write.save('sample_data/bad_records.txt')

Multi Delimiter

In [11]:
from pyspark.sql.functions import split

df = spark.read.option('delimiter','~|').csv('sample_data/multidelimiter.csv',header=True)
df.show()

df = df.withColumn('Initial',split('Name',',')[1])
df.show()

df = df.withColumn('Name',split('Name',',')[0])
df.show()


+-----------+---+
|       Name|Age|
+-----------+---+
|Varshini, S| 25|
|   Jothi, S| 52|
| Sathish, S| 55|
| Neelesh, S| 29|
+-----------+---+

+-----------+---+-------+
|       Name|Age|Initial|
+-----------+---+-------+
|Varshini, S| 25|      S|
|   Jothi, S| 52|      S|
| Sathish, S| 55|      S|
| Neelesh, S| 29|      S|
+-----------+---+-------+

+--------+---+-------+
|    Name|Age|Initial|
+--------+---+-------+
|Varshini| 25|      S|
|   Jothi| 52|      S|
| Sathish| 55|      S|
| Neelesh| 29|      S|
+--------+---+-------+



Map vs FlatMap

Data Skewness

In spark-3, AQE handles it by itself

In spark-2:

1. we can try with repartition - Splits the data equally

Repartitioning by a more evenly distributed column
df = df.repartition("region_id")

Or just increasing partitions to spread out skew
df = df.repartition(100)

Challenges:

df.repartition("user_id")
If user_id is skewed (like 90% of rows are for user_1), all that data still ends up on one partition — just a different one. So:

You've shuffled the data ✔️

But the skew is still there ❌

✅ Repartitioning just moves the problem to a different executor.

Skew Happens During a Join
This is a big one.

Even if both DataFrames are repartitioned before the join:

df1 = df1.repartition("user_id")
df2 = df2.repartition("user_id")
df1.join(df2, "user_id")
If user_id = user_1 exists millions of times in both tables, all matching rows go to one task during the shuffle phase. So again:

Partitioning was done ✔️

But data volume per key caused skew ❌

✅ What helps instead: salting, or broadcast joins if one side is small.

Not Enough Partitions to Spread the Load
If you do:

df.repartition(4)
...but your data has millions of rows, and one key dominates, you're not helping much.

Even worse — repartitioning with too few partitions can make the skew worse, because you’re squeezing more data into fewer slots.

✅ Use more partitions, and sometimes pair with salting.

4. You Repartition by a Column That's Also Skewed
If you repartition by a column that’s also skewed, it just moves the skew around:


df.repartition("country")  # But 90% of users are from "US"
Now all the "US" rows go to one partition. Not helpful.

✅ Better to repartition on a less skewed column or combine columns to reduce concentration.



2. Salting

purchases DataFrame:

user_id	purchase_amount
user_1	100
user_1	50
user_1	200
user_2	30
user_3	70
Notice how user_1 is skewed — lots of rows for just one key.

🔧 Step 1: Add a salt
We want to break user_1 into multiple salted keys so Spark can parallelize the work.

We'll assign a random salt value (between 0 and 1, for this demo — can be more in real world).

Let's randomly assign:

user_id	purchase_amount	salt	user_id_salted
user_1	100	0	user_1_0
user_1	50	1	user_1_1
user_1	200	0	user_1_0
user_2	30	0	user_2_0
user_3	70	1	user_3_1
This spreads the user_1 records across two partitions (user_1_0 and user_1_1).

🧮 Step 2: GroupBy on the salted key
Now, do the aggregation:

grouped = salted_df.groupBy("user_id_salted").agg(sum("purchase_amount").alias("partial_sum"))
Result:

user_id_salted	partial_sum
user_1_0	100 + 200 = 300
user_1_1	50
user_2_0	30
user_3_1	70
Nice! Now the work was split between user_1_0 and user_1_1, avoiding a bottleneck.

🧹 Step 3: Extract original key and re-group
Now we strip off the salt (e.g., split on _), and group by the original user_id:

final_df = grouped \
    .withColumn("user_id", split(col("user_id_salted"), "_")[0]) \
    .groupBy("user_id") \
    .agg(sum("partial_sum").alias("total_sum"))
Result:

user_id	total_sum
user_1	300 + 50 = 350
user_2	30
user_3	70
✅ That's your final answer, and you’ve handled the skewed user_1 cleanly and efficiently.

Partitioning

spark.read.parquet()	Based on file count & block size
groupBy, join, distinct, etc.	spark.sql.shuffle.partitions (default: 200)
df.rdd.getNumPartitions()	Returns current partition count
df.write.parquet("s3://your-bucket/output/") Will write 200 or df.rdd.getNumPartitions() files (usually .snappy.parquet files) to that path.



Small files problem

In Spark, every partition = one output file (usually .parquet, .json, .csv, etc.).

So if your DataFrame has 10,000 tiny partitions (maybe because each partition only has a few rows), then:

df.write.parquet("s3://bucket/output/")
🚨 Creates 10,000 small files.

1. Too Many Files Overwhelm Metadata Systems
Systems like HDFS, S3, Hive Metastore hate dealing with thousands of tiny files.

Every file = a metadata entry.

This bloats memory usage in NameNode (HDFS) or slows down listing/querying (S3, Glue, Hive).

2. Bad Performance When Reading
When you query later (e.g., using Spark, Athena, Presto), it opens lots of small files.

This means lots of I/O operations, network calls, and latency.

You’ll see poor parallelism: thousands of files → tasks that finish in milliseconds but cause tons of overhead.

3. Wasted Storage and Throughput
Each file has some overhead (compression headers, footers, etc.).

Small files don't utilize block storage well (esp. in HDFS), wasting space and reducing write efficiency.

Best Practices

File Format	Target File Size
Parquet	128 MB - 1 GB
ORC	256 MB - 1 GB
CSV/JSON	Smaller, but still 10+ MB recommended

💡 Large files = fewer tasks, better compression, better performance.

Flatten the json content

In [None]:
df = spark.read.csv('sample_data/json.csv',header=True)
df.show(truncate=False)

# Value is truncated

# These two options — quote and escape — are 🔑 when dealing with complex strings (like embedded JSON) in CSV files.

# option("quote", '"')
# This tells Spark:

# "Text between this character is considered as a single field — even if it has commas."

# option("escape", '"')
# This tells Spark:

# "If this character appears inside a quoted field, treat it as part of the string — not the end of the quote."

# quote	Defines boundaries of a field — usually ".
# escape	Tells Spark how to handle quotes inside a quoted field.

df = spark.read.option('quote','"').csv('sample_data/json.csv',header=True)
df.show(truncate=False)

# Even this is not working

+----------+-------+--------------------------------------+
|date      |status |request                               |
+----------+-------+--------------------------------------+
|2025-04-10|success|"{"response": {"message_id": "msg_001"|
|2025-04-10|error  |"{"response": {"message_id": "msg_002"|
|2025-04-09|success|"{"response": {"message_id": "msg_003"|
|2025-04-09|error  |"{"response": {"message_id": "msg_004"|
|2025-04-08|success|"{"response": {"message_id": "msg_005"|
|2025-04-08|success|"{"response": {"message_id": "msg_006"|
|2025-04-07|error  |"{"response": {"message_id": "msg_007"|
|2025-04-07|success|"{"response": {"message_id": "msg_008"|
+----------+-------+--------------------------------------+

+----------+-------+--------------------------------------+
|date      |status |request                               |
+----------+-------+--------------------------------------+
|2025-04-10|success|"{"response": {"message_id": "msg_001"|
|2025-04-10|error  |"{"response": {"mes

JSON spec requires double quotes for strings and keys
Valid JSON:

json
Copy
Edit
{"response": {"message_id": "msg_001"}}
Invalid JSON:

json
Copy
Edit
{'response': {'message_id': 'msg_001'}}
Tools like from_json() in Spark (and any proper JSON parser) will reject single quotes.

In [18]:

df = spark.read.option('quote','\'').csv('sample_data/json.csv',header=True)
df.show(truncate=False)

+----------+-------+------------------------------------------------------------------------------------+
|date      |status |request                                                                             |
+----------+-------+------------------------------------------------------------------------------------+
|2025-04-10|success|{"response": {"message_id": "msg_001", "latitude": 37.7749, "longitude": -122.4194}}|
|2025-04-10|error  |{"response": {"message_id": "msg_002", "latitude": 40.7128, "longitude": -74.0060}} |
|2025-04-09|success|{"response": {"message_id": "msg_003", "latitude": 34.0522, "longitude": -118.2437}}|
|2025-04-09|error  |{"response": {"message_id": "msg_004", "latitude": 51.5074, "longitude": -0.1278}}  |
|2025-04-08|success|{"response": {"message_id": "msg_005", "latitude": 48.8566, "longitude": 2.3522}}   |
|2025-04-08|success|{"response": {"message_id": "msg_006", "latitude": 52.5200, "longitude": 13.4050}}  |
|2025-04-07|error  |{"response": {"message_id"

Method-1: Using json_tuple

We should know the fields to extract

In [22]:
from pyspark.sql.functions import json_tuple

df.select('*',json_tuple('request','response')).show()

df1 = df.select('*',json_tuple('request','response')).drop('request')

df1.show()

+----------+-------+--------------------+--------------------+
|      date| status|             request|                  c0|
+----------+-------+--------------------+--------------------+
|2025-04-10|success|{"response": {"me...|{"message_id":"ms...|
|2025-04-10|  error|{"response": {"me...|{"message_id":"ms...|
|2025-04-09|success|{"response": {"me...|{"message_id":"ms...|
|2025-04-09|  error|{"response": {"me...|{"message_id":"ms...|
|2025-04-08|success|{"response": {"me...|{"message_id":"ms...|
|2025-04-08|success|{"response": {"me...|{"message_id":"ms...|
|2025-04-07|  error|{"response": {"me...|{"message_id":"ms...|
|2025-04-07|success|{"response": {"me...|{"message_id":"ms...|
+----------+-------+--------------------+--------------------+

+----------+-------+--------------------+
|      date| status|                  c0|
+----------+-------+--------------------+
|2025-04-10|success|{"message_id":"ms...|
|2025-04-10|  error|{"message_id":"ms...|
|2025-04-09|success|{"message_id"

In [27]:
df1.select('*',json_tuple('c0','message_id','latitude','longitude').alias('message_id','latitude','longitude')).show()

df2 = df1.select('*',json_tuple('c0','message_id','latitude','longitude').alias('message_id','latitude','longitude')).drop('c0')

df2.show()

+----------+-------+--------------------+----------+--------+---------+
|      date| status|                  c0|message_id|latitude|longitude|
+----------+-------+--------------------+----------+--------+---------+
|2025-04-10|success|{"message_id":"ms...|   msg_001| 37.7749|-122.4194|
|2025-04-10|  error|{"message_id":"ms...|   msg_002| 40.7128|  -74.006|
|2025-04-09|success|{"message_id":"ms...|   msg_003| 34.0522|-118.2437|
|2025-04-09|  error|{"message_id":"ms...|   msg_004| 51.5074|  -0.1278|
|2025-04-08|success|{"message_id":"ms...|   msg_005| 48.8566|   2.3522|
|2025-04-08|success|{"message_id":"ms...|   msg_006|   52.52|   13.405|
|2025-04-07|  error|{"message_id":"ms...|   msg_007| 35.6895| 139.6917|
|2025-04-07|success|{"message_id":"ms...|   msg_008| 34.0522|-118.2437|
+----------+-------+--------------------+----------+--------+---------+

+----------+-------+----------+--------+---------+
|      date| status|message_id|latitude|longitude|
+----------+-------+----------+--

Method-2: Using from_json

In [32]:
from pyspark.sql.functions import from_json,col
from pyspark.sql.types import *

json_schema = StructType([
    StructField("response", StructType([
        StructField("message_id", StringType()),
        StructField("latitude", DoubleType()),
        StructField("longitude", DoubleType())
    ]))
])

df_parsed = df.withColumn("request_parsed", from_json(col("request"), json_schema))
df_parsed.show(truncate=False)

df_flat = df_parsed.select(
    "date",
    "status",
    col("request_parsed.response.message_id").alias("message_id"),
    col("request_parsed.response.latitude").alias("latitude"),
    col("request_parsed.response.longitude").alias("longitude")
)

df_flat.show(truncate=False)


+----------+-------+------------------------------------------------------------------------------------+-------------------------------+
|date      |status |request                                                                             |request_parsed                 |
+----------+-------+------------------------------------------------------------------------------------+-------------------------------+
|2025-04-10|success|{"response": {"message_id": "msg_001", "latitude": 37.7749, "longitude": -122.4194}}|{{msg_001, 37.7749, -122.4194}}|
|2025-04-10|error  |{"response": {"message_id": "msg_002", "latitude": 40.7128, "longitude": -74.0060}} |{{msg_002, 40.7128, -74.006}}  |
|2025-04-09|success|{"response": {"message_id": "msg_003", "latitude": 34.0522, "longitude": -118.2437}}|{{msg_003, 34.0522, -118.2437}}|
|2025-04-09|error  |{"response": {"message_id": "msg_004", "latitude": 51.5074, "longitude": -0.1278}}  |{{msg_004, 51.5074, -0.1278}}  |
|2025-04-08|success|{"response": {

Above case we are manually giving the schema, but we can also get it from the data and pass it

In [42]:
df.select(col('request').alias('jsonCol')).rdd.map(lambda x:x.jsonCol).collect()

df_schema = spark.read.json(df.select(col('request').alias('jsonCol')).rdd.map(lambda x: x.jsonCol)).schema

df_schema



StructType([StructField('response', StructType([StructField('latitude', DoubleType(), True), StructField('longitude', DoubleType(), True), StructField('message_id', StringType(), True)]), True)])

In [None]:
df_parsed = df.withColumn("request_parsed", from_json(col("request"), df_schema))
df_parsed.show(truncate=False)

df_flat = df_parsed.select(
    "date",
    "status",
    col("request_parsed.response.message_id").alias("message_id"),
    col("request_parsed.response.latitude").alias("latitude"),
    col("request_parsed.response.longitude").alias("longitude")
)

df_flat.show(truncate=False)

+----------+-------+------------------------------------------------------------------------------------+-------------------------------+
|date      |status |request                                                                             |request_parsed                 |
+----------+-------+------------------------------------------------------------------------------------+-------------------------------+
|2025-04-10|success|{"response": {"message_id": "msg_001", "latitude": 37.7749, "longitude": -122.4194}}|{{37.7749, -122.4194, msg_001}}|
|2025-04-10|error  |{"response": {"message_id": "msg_002", "latitude": 40.7128, "longitude": -74.0060}} |{{40.7128, -74.006, msg_002}}  |
|2025-04-09|success|{"response": {"message_id": "msg_003", "latitude": 34.0522, "longitude": -118.2437}}|{{34.0522, -118.2437, msg_003}}|
|2025-04-09|error  |{"response": {"message_id": "msg_004", "latitude": 51.5074, "longitude": -0.1278}}  |{{51.5074, -0.1278, msg_004}}  |
|2025-04-08|success|{"response": {

In [53]:
df_parsed.printSchema()
json_col = df_parsed.schema['request_parsed'].dataType.names[0]
check = "request_parsed."+json_col+".*"
check
df_parsed.select('*',col(check)).show()
df_parsed.select('*',col(check)).drop('request').drop('request_parsed').show()

root
 |-- date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- request: string (nullable = true)
 |-- request_parsed: struct (nullable = true)
 |    |-- response: struct (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- message_id: string (nullable = true)

+----------+-------+--------------------+--------------------+--------+---------+----------+
|      date| status|             request|      request_parsed|latitude|longitude|message_id|
+----------+-------+--------------------+--------------------+--------+---------+----------+
|2025-04-10|success|{"response": {"me...|{{37.7749, -122.4...| 37.7749|-122.4194|   msg_001|
|2025-04-10|  error|{"response": {"me...|{{40.7128, -74.00...| 40.7128|  -74.006|   msg_002|
|2025-04-09|success|{"response": {"me...|{{34.0522, -118.2...| 34.0522|-118.2437|   msg_003|
|2025-04-09|  error|{"response": {"me...|{{51.5074, -0.127...| 51.5074|  -0.127

Common records in file1 and file2

We can do this in many ways:
- set operations: union, unionAll, distinct, intersect, intersectAll, subtract, exceptAll
- join operations: leftouter, rightouter, leftsemi, leftanti, cartesian

set operations can be only used if both the tables has same no of columns

Both ultimately result in same time and operation

In [58]:
studentA = spark.read.option('header',True).option('inferSchema',True).csv('sample_data/studentA.csv')
studentB = spark.read.option('header',True).option('inferSchema',True).csv('sample_data/studentB.csv')
studentA.show()
studentB.show()

+------+--------------+----------+
|rollno|          name|    mobile|
+------+--------------+----------+
|   101|      John Doe|9876543210|
|   102|    Jane Smith|9123456789|
|   103|  Robert Brown|9988776655|
|   104|   Alice Green|9012345678|
|   105|  Mike Johnson|9870011223|
|   106|  Linda Taylor|9345678901|
|   107|  David Wilson|9001122334|
|   108|    Emma Davis|9212345678|
|   109|  Chris Martin|9812345678|
|   110|    Sophia Lee|8899001122|
|   111|  Daniel Lewis|9700112233|
|   112|     Mia Clark|9456789012|
|   113|   Noah Walker|9332211455|
|   114|   Olivia Hall|9543217890|
|   115|    Liam Young|9877003456|
|   116|      Ava King|9445566778|
|   117|  Ethan Wright|9789012345|
|   118|Isabella Scott|9311234567|
|   119|   Logan Adams|9654321098|
|   120|   Emily Baker|9876001122|
+------+--------------+----------+

+------+------------+----------+
|rollno|        name|    mobile|
+------+------------+----------+
|   101|    John Doe|9876543210|
|   102|  Jane Smith|912345

In [61]:
studentA.join(studentB,studentA.rollno == studentB.rollno,'inner').select(studentA.rollno,studentA.name,studentA.mobile).show()

+------+------------+----------+
|rollno|        name|    mobile|
+------+------------+----------+
|   101|    John Doe|9876543210|
|   102|  Jane Smith|9123456789|
|   103|Robert Brown|9988776655|
|   104| Alice Green|9012345678|
|   105|Mike Johnson|9870011223|
|   106|Linda Taylor|9345678901|
|   107|David Wilson|9001122334|
|   108|  Emma Davis|9212345678|
|   109|Chris Martin|9812345678|
+------+------------+----------+



Using set operation

In [None]:
studentA.intersectAll(studentB).show()

+------+------------+----------+
|rollno|        name|    mobile|
+------+------------+----------+
|   109|Chris Martin|9812345678|
|   101|    John Doe|9876543210|
|   104| Alice Green|9012345678|
|   102|  Jane Smith|9123456789|
|   106|Linda Taylor|9345678901|
|   107|David Wilson|9001122334|
|   105|Mike Johnson|9870011223|
|   103|Robert Brown|9988776655|
|   108|  Emma Davis|9212345678|
+------+------------+----------+



Records present in one file but not in another 

In [64]:
studentA.join(studentB,studentA.rollno == studentB.rollno,'leftanti').select(studentA.rollno,studentA.name,studentA.mobile).show()

+------+--------------+----------+
|rollno|          name|    mobile|
+------+--------------+----------+
|   110|    Sophia Lee|8899001122|
|   111|  Daniel Lewis|9700112233|
|   112|     Mia Clark|9456789012|
|   113|   Noah Walker|9332211455|
|   114|   Olivia Hall|9543217890|
|   115|    Liam Young|9877003456|
|   116|      Ava King|9445566778|
|   117|  Ethan Wright|9789012345|
|   118|Isabella Scott|9311234567|
|   119|   Logan Adams|9654321098|
|   120|   Emily Baker|9876001122|
+------+--------------+----------+



In [67]:
studentA.exceptAll(studentB).show()

+------+--------------+----------+
|rollno|          name|    mobile|
+------+--------------+----------+
|   116|      Ava King|9445566778|
|   112|     Mia Clark|9456789012|
|   114|   Olivia Hall|9543217890|
|   118|Isabella Scott|9311234567|
|   111|  Daniel Lewis|9700112233|
|   117|  Ethan Wright|9789012345|
|   110|    Sophia Lee|8899001122|
|   120|   Emily Baker|9876001122|
|   113|   Noah Walker|9332211455|
|   115|    Liam Young|9877003456|
|   119|   Logan Adams|9654321098|
+------+--------------+----------+



Pivot and combine

In [69]:
marks = spark.read.csv('sample_data/marks.csv',header=True,inferSchema=True)
marks.show()

+------+----+-------+---------+-------+-------+-----+
|rollno|math|physics|chemistry|biology|english|total|
+------+----+-------+---------+-------+-------+-----+
|   101|  78|     82|       76|     85|     90|  411|
|   102|  88|     79|       84|     81|     86|  418|
|   103|  65|     70|       72|     68|     74|  349|
|   104|  92|     88|       91|     90|     89|  450|
|   105|  55|     60|       58|     62|     59|  294|
|   106|  73|     75|       78|     74|     76|  376|
|   107|  80|     83|       79|     81|     82|  405|
|   108|  90|     88|       85|     89|     87|  439|
|   109|  66|     68|       64|     70|     72|  340|
|   110|  85|     87|       84|     86|     88|  430|
|   111|  77|     74|       79|     76|     78|  384|
|   112|  69|     71|       68|     70|     72|  350|
|   113|  94|     95|       93|     96|     92|  470|
|   114|  58|     62|       60|     59|     61|  300|
|   115|  87|     89|       85|     88|     90|  439|
|   116|  72|     74|       

Method:1 - Using create_map and explode

In [None]:
from pyspark.sql.functions import create_map,lit,col,explode

subject_cols = ["math", "physics", "chemistry", "biology", "english"]

# Build map of subject -> marks
pairs = []
for sub in subject_cols:
    pairs.extend([lit(sub), col(sub)])
    print(pairs)

df_map = marks.select("rollno", explode(create_map(*pairs)).alias("subject", "marks"))
df_map.show()

# 1. 📦 create_map() – Build a Map from Column Pairs

# from pyspark.sql.functions import create_map, lit, col
# Let’s say you have:


# df = spark.createDataFrame([
#     (101, 78, 82, 90)
# ], ["rollno", "math", "physics", "english"])
# To build a map like this:


# {
#   "math": 78,
#   "physics": 82,
#   "english": 90
# }
# You use create_map():


# create_map(
#     lit("math"), col("math"),
#     lit("physics"), col("physics"),
#     lit("english"), col("english")
# )
# The lit() gives you the key, the col() gives you the value.

# 2. 💥 explode() – Turn Map Entries into Rows
# Now that we have a column that’s a map, explode() will take each key-value pair in that map and create a new row.

# python
# Copy
# Edit
# df.select(
#     "rollno",
#     explode(
#         create_map(
#             lit("math"), col("math"),
#             lit("physics"), col("physics"),
#             lit("english"), col("english")
#         )
#     ).alias("subject", "marks")
# )
# 🧾 Output:
# text
# Copy
# Edit
# +-------+--------+-----+
# |rollno |subject |marks|
# +-------+--------+-----+
# |101    |math    |78   |
# |101    |physics |82   |
# |101    |english |90   |


[Column<'math'>, Column<'math'>]
[Column<'math'>, Column<'math'>, Column<'physics'>, Column<'physics'>]
[Column<'math'>, Column<'math'>, Column<'physics'>, Column<'physics'>, Column<'chemistry'>, Column<'chemistry'>]
[Column<'math'>, Column<'math'>, Column<'physics'>, Column<'physics'>, Column<'chemistry'>, Column<'chemistry'>, Column<'biology'>, Column<'biology'>]
[Column<'math'>, Column<'math'>, Column<'physics'>, Column<'physics'>, Column<'chemistry'>, Column<'chemistry'>, Column<'biology'>, Column<'biology'>, Column<'english'>, Column<'english'>]
+------+---------+-----+
|rollno|  subject|marks|
+------+---------+-----+
|   101|     math|   78|
|   101|  physics|   82|
|   101|chemistry|   76|
|   101|  biology|   85|
|   101|  english|   90|
|   102|     math|   88|
|   102|  physics|   79|
|   102|chemistry|   84|
|   102|  biology|   81|
|   102|  english|   86|
|   103|     math|   65|
|   103|  physics|   70|
|   103|chemistry|   72|
|   103|  biology|   68|
|   103|  english|

Method - 2

In [None]:
subject_list = ["math", "physics", "chemistry", "biology", "english"]
num_subjects = len(subject_list)

stack_expr = ", ".join([f"'{sub}', {sub}" for sub in subject_list])
print(stack_expr)

df_unpivoted = marks.selectExpr("rollno", f"stack({num_subjects}, {stack_expr}) as (subject, marks)")
df_unpivoted.show()

# The stack(n, col1, col2, ..., coln) function is used to unpivot columns — it turns columns into rows.

# Think of it as:

# "For each row in the original DataFrame, stack will output n new rows, using pairs of values I give it."

# 🎯 Let's say you have this data:
# text
# Copy
# Edit
# +-------+-----+--------+----------+
# |rollno |math |physics |english   |
# +-------+-----+--------+----------+
# |101    | 78  | 82     | 90       |
# You want to convert it to:

# text
# Copy
# Edit
# +-------+--------+-------+
# |rollno |subject |marks  |
# +-------+--------+-------+
# |101    |math    |78     |
# |101    |physics |82     |
# |101    |english |90     |
# 💡 Step-by-step breakdown of stack
# python
# Copy
# Edit
# df.selectExpr("rollno", "stack(3, 'math', math, 'physics', physics, 'english', english) as (subject, marks)")
# 🧩 Here's what it's doing:
# stack(3, ...):
# → Create 3 rows per original row (one per subject)

# The arguments come in pairs: → First is a literal (the subject name)
# → Second is the column (marks value)

# So this:

# python
# Copy
# Edit
# stack(3,
#   'math', math,
#   'physics', physics,
#   'english', english
# )
# Is telling Spark:

# "For each row, create 3 new rows like:

# ('math', value of math)

# ('physics', value of physics)

# ('english', value of english)"

'math', math, 'physics', physics, 'chemistry', chemistry, 'biology', biology, 'english', english
+------+---------+-----+
|rollno|  subject|marks|
+------+---------+-----+
|   101|     math|   78|
|   101|  physics|   82|
|   101|chemistry|   76|
|   101|  biology|   85|
|   101|  english|   90|
|   102|     math|   88|
|   102|  physics|   79|
|   102|chemistry|   84|
|   102|  biology|   81|
|   102|  english|   86|
|   103|     math|   65|
|   103|  physics|   70|
|   103|chemistry|   72|
|   103|  biology|   68|
|   103|  english|   74|
|   104|     math|   92|
|   104|  physics|   88|
|   104|chemistry|   91|
|   104|  biology|   90|
|   104|  english|   89|
+------+---------+-----+
only showing top 20 rows



Again pivot 

In [80]:
from pyspark.sql.functions import coalesce

pivotBack = df_unpivoted.groupBy('rollno').pivot('subject').max('marks')

df_with_total = pivotBack.withColumn(
    "total",
    coalesce(col("math"), lit(0)) +
    coalesce(col("physics"), lit(0)) +
    coalesce(col("chemistry"), lit(0)) +
    coalesce(col("english"), lit(0)) +
    coalesce(col("biology"), lit(0))
)

df_with_total.show()

+------+-------+---------+-------+----+-------+-----+
|rollno|biology|chemistry|english|math|physics|total|
+------+-------+---------+-------+----+-------+-----+
|   108|     89|       85|     87|  90|     88|  439|
|   115|     88|       85|     90|  87|     89|  439|
|   101|     85|       76|     90|  78|     82|  411|
|   103|     68|       72|     74|  65|     70|  349|
|   111|     76|       79|     78|  77|     74|  384|
|   120|     88|       89|     87|  91|     90|  445|
|   117|     64|       62|     66|  60|     65|  317|
|   112|     70|       68|     72|  69|     71|  350|
|   107|     81|       79|     82|  80|     83|  405|
|   114|     59|       60|     61|  58|     62|  300|
|   102|     81|       84|     86|  88|     79|  418|
|   113|     96|       93|     92|  94|     95|  470|
|   109|     70|       64|     72|  66|     68|  340|
|   105|     62|       58|     59|  55|     60|  294|
|   110|     86|       84|     88|  85|     87|  430|
|   106|     74|       78|  

Get duplicate records

In [None]:
df = spark.read.csv('sample_data/duplicates.csv',header=True,inferSchema=True)

df.show()

df.groupBy('rollno','name','mobile').count().where("count > 1").drop('count').show()


+------+------------+----------+
|rollno|        name|    mobile|
+------+------------+----------+
|   101|    John Doe|9876543210|
|   102|  Jane Smith|9123456789|
|   103|Robert Brown|9988776655|
|   104| Alice Green|9012345678|
|   101|    John Doe|9876543210|
|   102|  Jane Smith|9123456789|
|   103|Robert Brown|9988776655|
|   104| Alice Green|9012345678|
+------+------------+----------+

+------+------------+----------+
|rollno|        name|    mobile|
+------+------------+----------+
|   101|    John Doe|9876543210|
|   104| Alice Green|9012345678|
|   102|  Jane Smith|9123456789|
|   103|Robert Brown|9988776655|
+------+------------+----------+



In [88]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

win = Window.partitionBy('name').orderBy(col('name'))

df.withColumn('rank',row_number().over(win)).filter('rank == 2').show()

+------+------------+----------+----+
|rollno|        name|    mobile|rank|
+------+------------+----------+----+
|   104| Alice Green|9012345678|   2|
|   102|  Jane Smith|9123456789|   2|
|   101|    John Doe|9876543210|   2|
|   103|Robert Brown|9988776655|   2|
+------+------------+----------+----+



Array to columns

In [103]:
from pyspark.sql.functions import split

df = spark.read.option('quote','\'').csv('sample_data/education.csv',header=True,inferSchema=True)
df.show()

result = df.withColumn('Education',explode(split(coalesce(col('Education'),lit('None')),',')))

wind = Window.partitionBy('Name').orderBy('Education')

result.withColumn('Index',row_number().over(wind)-1).show()

+-----+---+---------------+
| Name|Age|      Education|
+-----+---+---------------+
| Azar| 24|     MBA,BE,HSC|
| Babu| 28|           null|
| Mani| 22|MBA,BSE,Diploma|
|Mohan| 29|   MBA,BArch,SC|
+-----+---+---------------+

+-----+---+---------+-----+
| Name|Age|Education|Index|
+-----+---+---------+-----+
| Azar| 24|       BE|    0|
| Azar| 24|      HSC|    1|
| Azar| 24|      MBA|    2|
| Babu| 28|     None|    0|
| Mani| 22|      BSE|    0|
| Mani| 22|  Diploma|    1|
| Mani| 22|      MBA|    2|
|Mohan| 29|    BArch|    0|
|Mohan| 29|      MBA|    1|
|Mohan| 29|       SC|    2|
+-----+---+---------+-----+



In [106]:
from pyspark.sql.functions import explode_outer

df.withColumn('Education',explode_outer(split(col('Education'),','))).show()

+-----+---+---------+
| Name|Age|Education|
+-----+---+---------+
| Azar| 24|      MBA|
| Azar| 24|       BE|
| Azar| 24|      HSC|
| Babu| 28|     null|
| Mani| 22|      MBA|
| Mani| 22|      BSE|
| Mani| 22|  Diploma|
|Mohan| 29|      MBA|
|Mohan| 29|    BArch|
|Mohan| 29|       SC|
+-----+---+---------+



Ambiguous column name during flattening

Spark will automatically detect duplicate columns and while reading it throws error 

In [111]:
df = spark.read.option('multiline',True).json('sample_data/ambicol.json')
df.show()
df.printSchema()

df1 = df.select('*','delivery.*').drop('delivery')
df1.show()

+--------------------+----------+---------------+
|            delivery|    mobile|           name|
+--------------------+----------+---------------+
|{123 Park Ave, Ne...|9876543210|       John Doe|
|{456 Elm St, Los ...|9988776655|    Alice Smith|
|{789 Oak Dr, Chic...|9870011223|Michael Johnson|
+--------------------+----------+---------------+

root
 |-- delivery: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- mobile: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- mobile: string (nullable = true)
 |-- name: string (nullable = true)

+----------+---------------+--------------------+----------+-------------+
|    mobile|           name|             address|    mobile|         name|
+----------+---------------+--------------------+----------+-------------+
|9876543210|       John Doe|123 Park Ave, New...|9123456789|     Jane Doe|
|9988776655|    Alice Smith|456 Elm St, Los A...|9012345678|    Bob Smith|
|9870011223|Michael Johnso

In [112]:
df1.select('name').show()

AnalysisException: Reference 'name' is ambiguous, could be: name, name.

In [115]:
l = df1.columns
d = {}
for i in range(len(l)):
    if(d.get(l[i])):
        l[i] = l[i] + '_' + str(d.get(l[i]))
    else:
        d[l[i]] = i 

df2 = df1.toDF(*l)
df2.show()



+----------+---------------+--------------------+----------+-------------+
|    mobile|           name|             address|    mobile|       name_1|
+----------+---------------+--------------------+----------+-------------+
|9876543210|       John Doe|123 Park Ave, New...|9123456789|     Jane Doe|
|9988776655|    Alice Smith|456 Elm St, Los A...|9012345678|    Bob Smith|
|9870011223|Michael Johnson|789 Oak Dr, Chica...|9345678901|Sarah Johnson|
+----------+---------------+--------------------+----------+-------------+



RDD to DF and vice versa

In [125]:
# l = ['a','b','c'] X wrong

l = [['a'],['b'],['c']]

rdd = spark.sparkContext.parallelize(l)
rdd.collect()

df = spark.createDataFrame(rdd,['name'])
df.show()

rdd.toDF(['name']).show()

# Your code is almost correct, but there’s a small mismatch in how you're converting the RDD to a DataFrame — specifically, the RDD elements are strings ('a', 'b', 'c'), but Spark expects rows or tuples when creating a DataFrame with a schema.


+----+
|name|
+----+
|   a|
|   b|
|   c|
+----+

+----+
|name|
+----+
|   a|
|   b|
|   c|
+----+



In [126]:
rdd2 = df.rdd
rdd2.collect()

[Row(name='a'), Row(name='b'), Row(name='c')]

Remove N lines from a file while reading

In [128]:
df = spark.read.option("header", True) \
               .option("skipRows", 5) \
               .csv("sample_data/unwanted.csv")

df.show(truncate=False)

+---------------------------------------+
|# This is a sample CSV file            |
+---------------------------------------+
|# Generated for testing multi-line skip|
|# Author: OpenAI                       |
|# Date: 2025-04-10                     |
|# Below is the actual data             |
|id                                     |
|1                                      |
|2                                      |
|3                                      |
|4                                      |
|5                                      |
+---------------------------------------+



In [None]:
rdd = spark.sparkContext.textFile('sample_data/unwanted.csv',1)

def skip_first_five_lines(partition_index, lines):
    if partition_index == 0:
        return iter(list(lines)[6:])  # Skip first 5 lines in first partition
    else:
        return lines

filtered_rdd = rdd.mapPartitionsWithIndex(skip_first_five_lines)
filtered_rdd.collect()

header = filtered_rdd.first().split(",")
data_rdd = filtered_rdd.filter(lambda line: line != ",".join(header))


In [137]:
from pyspark.sql import Row
rows_rdd = data_rdd.map(lambda line: line.split(",")).map(lambda parts: Row(*parts))

rows_rdd.collect()

# Why are we using Row()?
# When creating a DataFrame from an RDD, Spark expects each record (row) to be:

# a tuple, or

# a Row object

# This helps Spark associate values with column names when a schema is defined.

[<Row('1', 'Alice', 'New York')>,
 <Row('2', 'Bob', 'Los Angeles')>,
 <Row('3', 'Charlie', 'Chicago')>,
 <Row('4', 'Diana', 'Houston')>,
 <Row('5', 'Evan', 'Phoenix')>]

In [None]:
df = spark.createDataFrame(rows_rdd, header)
df.show()

Read from multiple directories omitting some 

method1:
list of input paths
["data1/*.csv","data2/*.csv"]

method2:
use regex pattern
"data[1-3]*/*.csv"

method3:
use curly braces
"data{1,2,3}*/*.csv"

Check if DF is empty or not

In [146]:
input = spark.read.csv('sample_data/input1.csv',header=True,inferSchema=True)

input.show()

+--------+---+
|    Name|Age|
+--------+---+
| Monisha| 23|
|  Arvind| 24|
|Rishitha| 24|
|  Anusha| 24|
| Gayatri| 25|
+--------+---+



In [None]:
records = input.filter('age > 26')
records.show()

records.count()


+----+---+
|Name|Age|
+----+---+
+----+---+



In [161]:
records.first()

In [159]:
if(not records.count()):
    print('DF is empty')
else:
    print('DF is not empty')

DF is empty


In [162]:
if(not records.first()):
    print('DF is empty')
else:
    print('DF is not empty')

DF is empty


In [163]:
if(not records.take(0)):
    print('DF is empty')
else:
    print('DF is not empty')

DF is empty


In [156]:
if(records.rdd.isEmpty()):
    print('DF is empty')
else:
    print('DF is not empty')

DF is empty


In [None]:
# df.rdd.isEmpty()	Checks if the underlying RDD is empty (efficient)	Large DataFrames (best for performance)
# df.isEmpty()	Checks if the DataFrame is empty (available in Spark 3.x and later)	Recommended for Spark 3.x
# df.count()	Returns row count (slower, scans the entire DataFrame)	Small DataFrames, less efficient
# df.head() / df.take(1)	Checks if the first row exists (quick check)	Quick, small DataFrames

Accumulators - Alternative for df.count

A PySpark accumulator is a shared variable that workers can add to, but only the driver can read from.

✅ Use it to:

Count rows

Count errors, nulls, or bad data

Track totals (e.g., bytes processed)

Workers can only add	No read access inside workers
Only reliable in actions	They only update when an action is triggered (like .collect(), .count())
No guarantees in transformations	Lazy eval means accumulator values aren't reliable in .map() unless an action forces execution


🧠 When to Use df.count()
✅ Ideal for:
Total number of rows

Size check after filtering or joins

Fast, declarative counting

df.filter("country = 'India'").count()

🛠 When to Use Accumulators

✅ Ideal for:
Counting specific conditions (e.g. number of rows with nulls, invalid formats)

Tracking custom metrics during transformation

Debugging data pipeline quality issues

error_count = sc.accumulator(0)

def validate(row):
    global error_count
    if row['email'] is None:
        error_count += 1
    return row

df.rdd.map(validate).count()
print("Null emails:", error_count.value)

In [14]:
from pyspark.sql.functions import spark_partition_id

record_count = spark.sparkContext.accumulator(0)
age_count = spark.sparkContext.accumulator(0)

bank = spark.read.format('csv').option('header',True).option('inferSchema',True).load('sample_data/BankChurners.csv')

bank.count()

10127

In [10]:
bank.rdd.foreach(lambda x:record_count.add(1))

print(record_count.value)

10127


In [15]:
bank.filter('Customer_Age > 40').count()

7730

In [12]:
age_count = spark.sparkContext.accumulator(0)

def check(row):
    if(row['Customer_Age']>40):
        age_count.add(1)
    return row

bank.rdd.map(check).collect()

print(age_count.value)

7730


Masking Data - Sensitive data

Use UDF

In [27]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf,col

schema = StructType(
    [
    StructField('rollno',IntegerType()),
    StructField('name',StringType()),
    StructField('mobile',StringType())
    ]
)

unmask = spark.read.schema(schema=schema).csv('sample_data/duplicates.csv',header=True)

unmask.show()

unmask.printSchema()

def mobile_mask(col):
    new = col[0:2] + ('*' * 7) + col[-2] + col[-1]
    return new
    
# print(mobile_mask('9876543210'))

mask_udf = udf(mobile_mask,StringType())

mask_df = unmask.withColumn('mobile_masked',mask_udf(col('mobile')))

mask_df.show()

+------+------------+----------+
|rollno|        name|    mobile|
+------+------------+----------+
|   101|    John Doe|9876543210|
|   102|  Jane Smith|9123456789|
|   103|Robert Brown|9988776655|
|   104| Alice Green|9012345678|
|   101|    John Doe|9876543210|
|   102|  Jane Smith|9123456789|
|   103|Robert Brown|9988776655|
|   104| Alice Green|9012345678|
+------+------------+----------+

root
 |-- rollno: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- mobile: string (nullable = true)

+------+------------+----------+-------------+
|rollno|        name|    mobile|mobile_masked|
+------+------------+----------+-------------+
|   101|    John Doe|9876543210|  98*******10|
|   102|  Jane Smith|9123456789|  91*******89|
|   103|Robert Brown|9988776655|  99*******55|
|   104| Alice Green|9012345678|  90*******78|
|   101|    John Doe|9876543210|  98*******10|
|   102|  Jane Smith|9123456789|  91*******89|
|   103|Robert Brown|9988776655|  99*******55|
|   104| Alice

Transaction table

In [28]:
data = [
    (101, 'credit', 500.00),
    (102, 'debit', 200.00),
    (101, 'debit', 150.00),
    (103, 'credit', 1000.00),
    (102, 'credit', 300.00),
    (104, 'debit', 120.00),
    (105, 'credit', 750.00),
    (103, 'debit', 400.00),
    (104, 'credit', 250.00),
    (101, 'credit', 100.00),
]

columns = ['customer_id', 'transaction_type', 'amount']
df = spark.createDataFrame(data, columns)

df.show()

+-----------+----------------+------+
|customer_id|transaction_type|amount|
+-----------+----------------+------+
|        101|          credit| 500.0|
|        102|           debit| 200.0|
|        101|           debit| 150.0|
|        103|          credit|1000.0|
|        102|          credit| 300.0|
|        104|           debit| 120.0|
|        105|          credit| 750.0|
|        103|           debit| 400.0|
|        104|          credit| 250.0|
|        101|          credit| 100.0|
+-----------+----------------+------+



In [None]:
from pyspark.sql.functions import when, col

df1 = df.withColumn('amount_chk',when(col('transaction_type')=='credit',col('amount')).otherwise(-1*col('amount')))

df.show()

df1.select('customer_id','amount_chk').groupBy('customer_id').sum('amount_chk').show()

+-----------+----------------+------+
|customer_id|transaction_type|amount|
+-----------+----------------+------+
|        101|          credit| 500.0|
|        102|           debit| 200.0|
|        101|           debit| 150.0|
|        103|          credit|1000.0|
|        102|          credit| 300.0|
|        104|           debit| 120.0|
|        105|          credit| 750.0|
|        103|           debit| 400.0|
|        104|          credit| 250.0|
|        101|          credit| 100.0|
+-----------+----------------+------+

+-----------+---------------+
|customer_id|sum(amount_chk)|
+-----------+---------------+
|        101|          450.0|
|        102|          100.0|
|        103|          600.0|
|        104|          130.0|
|        105|          750.0|
+-----------+---------------+



In [34]:
from pyspark.sql.functions import sum
df1.groupBy('customer_id').agg(sum('amount_chk')).alias('total balance').show()

+-----------+---------------+
|customer_id|sum(amount_chk)|
+-----------+---------------+
|        101|          450.0|
|        102|          100.0|
|        103|          600.0|
|        104|          130.0|
|        105|          750.0|
+-----------+---------------+



In [37]:
df2 = df1.groupBy('customer_id').pivot('transaction_type').agg(sum('amount'))

df2.show()

df2.selectExpr('customer_id','coalesce(credit,0)-coalesce(debit,0)').show()

+-----------+------+-----+
|customer_id|credit|debit|
+-----------+------+-----+
|        103|1000.0|400.0|
|        104| 250.0|120.0|
|        105| 750.0| null|
|        101| 600.0|150.0|
|        102| 300.0|200.0|
+-----------+------+-----+

+-----------+------------------------------------------+
|customer_id|(coalesce(credit, 0) - coalesce(debit, 0))|
+-----------+------------------------------------------+
|        103|                                     600.0|
|        104|                                     130.0|
|        105|                                     750.0|
|        101|                                     450.0|
|        102|                                     100.0|
+-----------+------------------------------------------+



union and unionByName

union 

merge the 2 dataframes with respect to the position of the column and datatype of the column 

requires both the tables to have same no of columns

column names should be of same order 

but unionByName will match based on the column names and even if there are different no of columns, it will take null 

Repartition(col) Vs PartitionBy(col)

PartitionBy can we used only with write, creates separate folders for each partition. There wont be any shuffle with partitionBy, it will directly push the data to the folders



Read files recursively

use option('recursiveFileLookup',true)

Date time handling in schema using inferschema

use option('timestampFormat','M/d/yyy')
use option('dateformat','M/d/yyy')