 ## Merge DataFrame in Spark

#### First Way
By adding extra column with null values withColumn, lit

In [19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("two").getOrCreate()

In [20]:
df1 = spark.read.csv('DataSets/file1.csv',header=True,sep='|')
df1.show()

+-----------------+---+
|             Name|Age|
+-----------------+---+
|Nikhil,Pidiparthy| 26|
|      Akhil,Peddi| 31|
|         Sony,Bil| 35|
|         Demo,Raj| 21|
|      Nibba,Nibbi| 16|
+-----------------+---+



In [21]:
spark.read.csv?

[0;31mSignature:[0m
[0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mcsv[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mpath[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mschema[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msep[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mencoding[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mquote[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mescape[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcomment[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mheader[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0minferSchema[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mignoreLeadingWhiteSpace[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mignoreTrailingWhiteSpace[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;

In [22]:
df2 = spark.read.csv('DataSets/file2.csv',header=True,sep='|')
df2.show()

+-----------+---+------+
|       Name|Age|Gender|
+-----------+---+------+
|Tamar,Reddy| 26|  Male|
| Roti,Pedda| 11|Female|
|  Billa,Pow| 35|  Male|
|  Kiddy,Bid| 21|  Male|
|Vaddi,Naddi| 16|Female|
+-----------+---+------+



In [23]:
df1.union(df2)

AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 2 columns and the second table has 3 columns;
'Union false, false
:- Relation [Name#223,Age#224] csv
+- Relation [Name#254,Age#255,Gender#256] csv


In [24]:
from pyspark.sql.functions import lit

df_1 = df1.withColumn('Gender',lit(None))
df_1.show()

+-----------------+---+------+
|             Name|Age|Gender|
+-----------------+---+------+
|Nikhil,Pidiparthy| 26|  null|
|      Akhil,Peddi| 31|  null|
|         Sony,Bil| 35|  null|
|         Demo,Raj| 21|  null|
|      Nibba,Nibbi| 16|  null|
+-----------------+---+------+



In [25]:
df_1.union(df2).show()

+-----------------+---+------+
|             Name|Age|Gender|
+-----------------+---+------+
|Nikhil,Pidiparthy| 26|  null|
|      Akhil,Peddi| 31|  null|
|         Sony,Bil| 35|  null|
|         Demo,Raj| 21|  null|
|      Nibba,Nibbi| 16|  null|
|      Tamar,Reddy| 26|  Male|
|       Roti,Pedda| 11|Female|
|        Billa,Pow| 35|  Male|
|        Kiddy,Bid| 21|  Male|
|      Vaddi,Naddi| 16|Female|
+-----------------+---+------+



### Second way
Applying own schema

In [26]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

schema = StructType (
[
StructField('Name',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Gender',StringType(),True)
]
)

df3 = spark.read.csv('DataSets/file1.csv',sep ='|',header = True, schema=schema)
df3.show()

+-----------------+---+------+
|             Name|Age|Gender|
+-----------------+---+------+
|Nikhil,Pidiparthy| 26|  null|
|      Akhil,Peddi| 31|  null|
|         Sony,Bil| 35|  null|
|         Demo,Raj| 21|  null|
|      Nibba,Nibbi| 16|  null|
+-----------------+---+------+



23/02/23 10:33:40 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 2, schema size: 3
CSV file: file:///config/workspace/PySpark%20Interview%20Questions/file1.csv


In [27]:
df4 = spark.read.csv('DataSets/file2.csv',sep ='|',header = True, schema=schema)
df4.show()

+-----------+---+------+
|       Name|Age|Gender|
+-----------+---+------+
|Tamar,Reddy| 26|  Male|
| Roti,Pedda| 11|Female|
|  Billa,Pow| 35|  Male|
|  Kiddy,Bid| 21|  Male|
|Vaddi,Naddi| 16|Female|
+-----------+---+------+



In [28]:
df3.union(df4).show()

+-----------------+---+------+
|             Name|Age|Gender|
+-----------------+---+------+
|Nikhil,Pidiparthy| 26|  null|
|      Akhil,Peddi| 31|  null|
|         Sony,Bil| 35|  null|
|         Demo,Raj| 21|  null|
|      Nibba,Nibbi| 16|  null|
|      Tamar,Reddy| 26|  Male|
|       Roti,Pedda| 11|Female|
|        Billa,Pow| 35|  Male|
|        Kiddy,Bid| 21|  Male|
|      Vaddi,Naddi| 16|Female|
+-----------------+---+------+



23/02/23 10:33:48 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 2, schema size: 3
CSV file: file:///config/workspace/PySpark%20Interview%20Questions/file1.csv


### Thrid Way

with Outer Join

In [30]:
df5 = spark.read.csv('DataSets/file1.csv',header=True,sep='|')

df6 = spark.read.csv('DataSets/file2.csv',header=True,sep='|')

In [None]:
df5.show()

+-----------------+---+
|             Name|Age|
+-----------------+---+
|Nikhil,Pidiparthy| 26|
|      Akhil,Peddi| 31|
|         Sony,Bil| 35|
|         Demo,Raj| 21|
|      Nibba,Nibbi| 16|
+-----------------+---+



In [None]:
df6.show()

+-----------+---+------+
|       Name|Age|Gender|
+-----------+---+------+
|Tamar,Reddy| 26|  Male|
| Roti,Pedda| 11|Female|
|  Billa,Pow| 35|  Male|
|  Kiddy,Bid| 21|  Male|
|Vaddi,Naddi| 16|Female|
+-----------+---+------+



In [None]:
df_o = df5.join(df6,on = ['Name','Age'],how = 'Outer')
df_o.show()

+-----------------+---+------+
|             Name|Age|Gender|
+-----------------+---+------+
|      Akhil,Peddi| 31|  null|
|        Billa,Pow| 35|  Male|
|         Demo,Raj| 21|  null|
|        Kiddy,Bid| 21|  Male|
|      Nibba,Nibbi| 16|  null|
|Nikhil,Pidiparthy| 26|  null|
|       Roti,Pedda| 11|Female|
|         Sony,Bil| 35|  null|
|      Tamar,Reddy| 26|  Male|
|      Vaddi,Naddi| 16|Female|
+-----------------+---+------+



### Final Way
For more columns

In [31]:
df7 = spark.read.csv('DataSets/file1.csv',header=True,sep='|')
print(df7.columns)

df8 = spark.read.csv('DataSets/file2.csv',header=True,sep='|')
print(df8.columns)

['Name', 'Age']
['Name', 'Age', 'Gender']


In [33]:
list_1 = set(df7.columns) - set(df8.columns)
list_2 = set(df8.columns) - set(df7.columns)

In [34]:
print(list_1)
print(list_2)

set()
{'Gender'}


In [35]:
for each in list_1:
    df8 = df8.withColumn(each,lit(None))
df8.show()


+-----------+---+------+
|       Name|Age|Gender|
+-----------+---+------+
|Tamar,Reddy| 26|  Male|
| Roti,Pedda| 11|Female|
|  Billa,Pow| 35|  Male|
|  Kiddy,Bid| 21|  Male|
|Vaddi,Naddi| 16|Female|
+-----------+---+------+



In [36]:
for each in list_2:
    df7 = df7.withColumn(each,lit(None))
df7.show()

+-----------------+---+------+
|             Name|Age|Gender|
+-----------------+---+------+
|Nikhil,Pidiparthy| 26|  null|
|      Akhil,Peddi| 31|  null|
|         Sony,Bil| 35|  null|
|         Demo,Raj| 21|  null|
|      Nibba,Nibbi| 16|  null|
+-----------------+---+------+



In [37]:
Fdf = df7.union(df8)
Fdf.show()

+-----------------+---+------+
|             Name|Age|Gender|
+-----------------+---+------+
|Nikhil,Pidiparthy| 26|  null|
|      Akhil,Peddi| 31|  null|
|         Sony,Bil| 35|  null|
|         Demo,Raj| 21|  null|
|      Nibba,Nibbi| 16|  null|
|      Tamar,Reddy| 26|  Male|
|       Roti,Pedda| 11|Female|
|        Billa,Pow| 35|  Male|
|        Kiddy,Bid| 21|  Male|
|      Vaddi,Naddi| 16|Female|
+-----------------+---+------+

