### Reading data from CSV files

In [65]:
df1 = spark.read\
          .option("header",True)\
          .option("inferSchema",True)\
          .csv("/Users/ahmed.uzzaman/Downloads/Stores.csv")

In [39]:
df1.printSchema()

root
 |-- Store_ID : integer (nullable = true)
 |-- Store_Area: integer (nullable = true)
 |-- Items_Available: integer (nullable = true)
 |-- Daily_Customer_Count: integer (nullable = true)
 |-- Store_Sales: integer (nullable = true)



In [66]:
df2 = spark.read\
          .option("header",True)\
          .option("inferSchema",True)\
          .csv("/Users/ahmed.uzzaman/Downloads/Stores_new.csv")

In [40]:
df2.schema

StructType(List(StructField(Store_ID ,IntegerType,true),StructField(Store_Area,IntegerType,true),StructField(Items_Available,IntegerType,true),StructField(Daily_Customer_Count,IntegerType,true),StructField(Store_Sales,IntegerType,true)))

### Finding record counts and its difference

In [8]:
df1_count = df1.count()
df2_count = df2.count()

print("base count:",df1_count)
print("compare count:",df2_count)
print("# Difference:",df2_count - df1_count)
print("% Difference:",((df2_count - df1_count)/df1_count) * 100)

base count: 896
compare count: 899
# Difference: 3
% Difference: 0.33482142857142855


### Finding KPI for "Store_Sales" column

In [29]:
from pyspark.sql.functions import *

df1_kpi = df1.agg(lit("df1").alias("Source")
                  ,lit("Store_Sales").alias("Column_Name")
                  ,sum("Store_Sales").alias("SUM")
                  ,min("Store_Sales").alias("MIN")
                  ,max("Store_Sales").alias("MAX")
                  ,avg("Store_Sales").alias("AVG"))
df2_kpi = df2.agg(lit("df2").alias("Source")
                  ,lit("Store_Sales").alias("Column_Name")
                  ,sum("Store_Sales").alias("SUM")
                  ,min("Store_Sales").alias("MIN")
                  ,max("Store_Sales").alias("MAX")
                  ,avg("Store_Sales").alias("AVG"))

## Using JOIN clause
merge_kpi = df1_kpi.join(df2_kpi,"Column_Name","FULL")
merge_kpi.show()

In [30]:
## Using UNION clause
merge = df1_kpi.union(df2_kpi)
merge.show()

+------+-----------+--------+-----+------+------------------+
|Source|Column_Name|     SUM|  MIN|   MAX|               AVG|
+------+-----------+--------+-----+------+------------------+
|   df1|Store_Sales|53178770|14920|116320| 59351.30580357143|
|   df2|Store_Sales|53317781|14920|116320|59307.876529477195|
+------+-----------+--------+-----+------+------------------+



### Finding distinct values for "Store_ID"

In [52]:
from pyspark.sql.functions import countDistinct

distinct_df1 = df1.select(lit("Store_ID").alias("Column_Name"),countDistinct("Store_ID").alias("df1_Count"))
distinct_df2 = df2.select(lit("Store_ID").alias("Column_Name"),countDistinct("Store_ID").alias("df2_Count"))

merge_distinct = distinct_df1.join(distinct_df2,"Column_Name","FULL")
merge_distinct.show()

                                                                                

+-----------+---------+---------+
|Column_Name|df1_Count|df2_Count|
+-----------+---------+---------+
|   Store_ID|      896|      899|
+-----------+---------+---------+



### Finding missing values in df1

In [54]:
missing_in_df1 = df1.join(df2,"Store_ID","LEFTANTI")
missing_in_df1.show()

+--------+----------+---------------+--------------------+-----------+
|Store_ID|Store_Area|Items_Available|Daily_Customer_Count|Store_Sales|
+--------+----------+---------------+--------------------+-----------+
+--------+----------+---------------+--------------------+-----------+



### Finding missing values in df2

In [55]:
missing_in_df2 = df2.join(df1,"Store_ID","LEFTANTI")
missing_in_df2.show()

+--------+----------+---------------+--------------------+-----------+
|Store_ID|Store_Area|Items_Available|Daily_Customer_Count|Store_Sales|
+--------+----------+---------------+--------------------+-----------+
|     897|      2151|           1555|                 566|      65214|
|     898|      2163|           2111|                 497|      32541|
|     899|      2174|           1002|                1021|      41256|
+--------+----------+---------------+--------------------+-----------+



### Creating a temporary view on dataframes

In [56]:
df1.createOrReplaceTempView("df1_data")
df2.createOrReplaceTempView("df2_data")

### Finding mismatched records for Store_Area

In [64]:
q = f"""
        SELECT c.Store_ID
               ,c.Store_Area AS df1_Store_Area
               ,f.Store_Area AS df2_Store_Area
        FROM   df1_data c
               JOIN df2_data f
                 ON c.Store_ID = f.Store_ID
        WHERE  c.Store_Area != f.Store_Area"""
df_mismatch = spark.sql(q)
df_mismatch.show()

+--------+--------------+--------------+
|Store_ID|df1_Store_Area|df2_Store_Area|
+--------+--------------+--------------+
|      92|          2169|          2099|
|     399|          2063|          2074|
|     467|          2229|          2118|
|     541|          2214|          2105|
|     850|          2067|          2079|
+--------+--------------+--------------+



### Finding mismatches in df1 using exceptAll

In [60]:
df1.exceptAll(df2).show()

+--------+----------+---------------+--------------------+-----------+
|Store_ID|Store_Area|Items_Available|Daily_Customer_Count|Store_Sales|
+--------+----------+---------------+--------------------+-----------+
|     399|      2063|           2493|                 810|      51480|
|      92|      2169|           2617|                 600|      67080|
|     467|      2229|           2667|                 660|      87410|
|     541|      2214|           2647|                 740|      65900|
|     850|      2067|           2492|                 790|      70230|
+--------+----------+---------------+--------------------+-----------+



[Stage 116:>                                                        (0 + 2) / 2]                                                                                

### Finding mismatches in df2 using exceptAll

In [61]:
df2.exceptAll(df1).show()

+--------+----------+---------------+--------------------+-----------+
|Store_ID|Store_Area|Items_Available|Daily_Customer_Count|Store_Sales|
+--------+----------+---------------+--------------------+-----------+
|     399|      2074|           2493|                 810|      51480|
|     850|      2079|           2492|                 790|      70230|
|      92|      2099|           2617|                 600|      67080|
|     897|      2151|           1555|                 566|      65214|
|     467|      2118|           2667|                 660|      87410|
|     898|      2163|           2111|                 497|      32541|
|     899|      2174|           1002|                1021|      41256|
|     541|      2105|           2647|                 740|      65900|
+--------+----------+---------------+--------------------+-----------+



### Finding duplicate values in df1

In [68]:
df1_duplicate = df1.groupBy("Store_ID").count().filter("count > 1")
df1_duplicate.show()

+--------+-----+
|Store_ID|count|
+--------+-----+
+--------+-----+



### Finding duplicate values in df2

In [70]:
df2_duplicate = df2.groupBy("Store_ID").count().filter("count > 1")
df2_duplicate.show()

+--------+-----+
|Store_ID|count|
+--------+-----+
|     899|    2|
+--------+-----+

