In [None]:
# load data
df = spark.read.format("csv").option("header",True).load("/mnt/projectteam14/Final_Data.csv")

In [None]:
spark.conf.set("spark.sql.repl.eagerEval.maxNumColumns",14)

In [None]:
# count no of rows and columns 
print(" rows",df.count(), "\n","columns",len(df.columns))

 rows 51729905 
 columns 14


In [None]:
df.show(n=10)

+---+------------+----------+-------------+-----------+-------------+------+-----------------+-----------------------+-----------------------+-------------------------+-------------+------------+----------+
|_c0|   Commodity|State Name|District Name|Market Name|      Variety| Group|Arrivals (Tonnes)|Min Price (Rs./Quintal)|Max Price (Rs./Quintal)|Modal Price (Rs./Quintal)|Reported Date|Unnamed: 0.1|Unnamed: 0|
+---+------------+----------+-------------+-----------+-------------+------+-----------------+-----------------------+-----------------------+-------------------------+-------------+------------+----------+
|  0|ChennangiDal|   Gujarat|        Surat|    Songadh|        Other|Pulses|              0.5|                 5500.0|                   5905|                   5705.0|  04 Apr 2017|        NULL|      NULL|
|  1|ChennangiDal|   Gujarat|        Surat|    Songadh|   Gram Chapa|Pulses|              1.1|                 3560.0|                   3965|                   3775.0|  04

In [None]:
# check datatypes
df.dtypes

[('_c0', 'string'),
 ('Commodity', 'string'),
 ('State Name', 'string'),
 ('District Name', 'string'),
 ('Market Name', 'string'),
 ('Variety', 'string'),
 ('Group', 'string'),
 ('Arrivals (Tonnes)', 'string'),
 ('Min Price (Rs./Quintal)', 'string'),
 ('Max Price (Rs./Quintal)', 'string'),
 ('Modal Price (Rs./Quintal)', 'string'),
 ('Reported Date', 'string'),
 ('Unnamed: 0.1', 'string'),
 ('Unnamed: 0', 'string')]

In [None]:
# drop unnecessary columns
df = df.drop("_c0","Unnamed: 0.1","Unnamed: 0")
df

DataFrame[Commodity: string, State Name: string, District Name: string, Market Name: string, Variety: string, Group: string, Arrivals (Tonnes): string, Min Price (Rs./Quintal): string, Max Price (Rs./Quintal): string, Modal Price (Rs./Quintal): string, Reported Date: string]

In [None]:
#checking again
df.dtypes

[('Commodity', 'string'),
 ('State Name', 'string'),
 ('District Name', 'string'),
 ('Market Name', 'string'),
 ('Variety', 'string'),
 ('Group', 'string'),
 ('Arrivals (Tonnes)', 'string'),
 ('Min Price (Rs./Quintal)', 'string'),
 ('Max Price (Rs./Quintal)', 'string'),
 ('Modal Price (Rs./Quintal)', 'string'),
 ('Reported Date', 'string')]

In [None]:
# renaming columns 
df = df.withColumnRenamed("Min Price (Rs./Quintal)", "Min Price (Rs/Quintal)")

In [None]:
df = df.withColumnRenamed("Max Price (Rs./Quintal)", "Max Price (Rs/Quintal)")

In [None]:
df = df.withColumnRenamed("Modal Price (Rs./Quintal)", "Modal Price (Rs/Quintal)")

In [None]:
df.columns

['Commodity',
 'State Name',
 'District Name',
 'Market Name',
 'Variety',
 'Group',
 'Arrivals (Tonnes)',
 'Min Price (Rs/Quintal)',
 'Max Price (Rs/Quintal)',
 'Modal Price (Rs/Quintal)',
 'Reported Date']

In [None]:
# changing datatypes 
columns = ['Arrivals (Tonnes)','Min Price (Rs/Quintal)','Max Price (Rs/Quintal)','Modal Price (Rs/Quintal)']

from pyspark.sql.functions import col
for colname in columns:
    df = df.withColumn(colname,col(colname).cast('float'))
df.dtypes

[('Commodity', 'string'),
 ('State Name', 'string'),
 ('District Name', 'string'),
 ('Market Name', 'string'),
 ('Variety', 'string'),
 ('Group', 'string'),
 ('Arrivals (Tonnes)', 'float'),
 ('Min Price (Rs/Quintal)', 'float'),
 ('Max Price (Rs/Quintal)', 'float'),
 ('Modal Price (Rs/Quintal)', 'float'),
 ('Reported Date', 'string')]

In [None]:
from pyspark.sql.functions import to_date
df = df.withColumn("Reported Date", to_date(col("Reported Date"), "dd MMM yyyy"))

In [None]:
# checking again
df.dtypes

[('Commodity', 'string'),
 ('State Name', 'string'),
 ('District Name', 'string'),
 ('Market Name', 'string'),
 ('Variety', 'string'),
 ('Group', 'string'),
 ('Arrivals (Tonnes)', 'float'),
 ('Min Price (Rs/Quintal)', 'float'),
 ('Max Price (Rs/Quintal)', 'float'),
 ('Modal Price (Rs/Quintal)', 'float'),
 ('Reported Date', 'date')]

In [None]:
# counting null values for each column
for colname in df.columns:
    nullcount = df.filter(col(colname).isNull()).count()
    print(f"{colname} : {nullcount}")

Commodity : 0
State Name : 0
District Name : 0
Market Name : 0
Variety : 0
Group : 0
Arrivals (Tonnes) : 7415990
Min Price (Rs/Quintal) : 28
Max Price (Rs/Quintal) : 112
Modal Price (Rs/Quintal) : 1057
Reported Date : 0


In [None]:
# percentage of null values
count = df.count()
for colname in df.columns:
    nullcount = df.filter(col(colname).isNull()).count()
    percentage = (nullcount/count)*100
    print(f"{colname}: {percentage}")

Commodity: 0.0
State Name: 0.0
District Name: 0.0
Market Name: 0.0
Variety: 0.0
Group: 0.0
Arrivals (Tonnes): 14.335982252432128
Min Price (Rs/Quintal): 5.412729831999498e-05
Max Price (Rs/Quintal): 0.00021650919327997992
Modal Price (Rs/Quintal): 0.0020433055115798102
Reported Date: 0.0


In [None]:
# filling null values
# avg min_price for each commodity
from pyspark.sql import functions as F
from pyspark.sql.functions import avg,round
avg_min = df.groupBy('Commodity').agg(round(F.avg('Min Price (Rs/Quintal)'),2).alias('avg_min'))
avg_min.show()

+------------------+--------+
|         Commodity| avg_min|
+------------------+--------+
|            Papaya| 1653.23|
|PointedgourdParval| 2719.44|
|     TenderCoconut| 4861.69|
|         Thondekai| 1501.78|
|   Pepperungarbled|43546.94|
|         Pineapple| 2367.05|
|    ChillyCapsicum| 1620.91|
|           Peascod| 2377.36|
|               Tea|15667.29|
|          ChowChow| 1494.47|
|   CinamonDalchini|  3200.0|
|            Turnip|  731.93|
|             Peach|  2301.8|
|       SkinAndHide|   165.0|
|      TamarindSeed| 1729.92|
|               Yam| 2204.19|
|     ChikoosSapota| 1674.78|
|     TubeRoseLoose|   53.01|
|              Plum| 2961.36|
|              Coca|  4400.0|
+------------------+--------+
only showing top 20 rows



In [None]:
# checking avg_min for commodity - ChennangiDal
a = avg_min.where(avg_min["Commodity"] == "ChennangiDal")
a.show()

+------------+-------+
|   Commodity|avg_min|
+------------+-------+
|ChennangiDal| 5018.4|
+------------+-------+



In [None]:
newdf = df.join(avg_min,on='Commodity',how='left')
newdf.show()

+------------+----------+-------------+-----------+-------------+------+-----------------+----------------------+----------------------+------------------------+-------------+-------+
|   Commodity|State Name|District Name|Market Name|      Variety| Group|Arrivals (Tonnes)|Min Price (Rs/Quintal)|Max Price (Rs/Quintal)|Modal Price (Rs/Quintal)|Reported Date|avg_min|
+------------+----------+-------------+-----------+-------------+------+-----------------+----------------------+----------------------+------------------------+-------------+-------+
|ChennangiDal|   Gujarat|        Surat|    Songadh|        Other|Pulses|              0.5|                5500.0|                5905.0|                  5705.0|   2017-04-04| 5018.4|
|ChennangiDal|   Gujarat|        Surat|    Songadh|   Gram Chapa|Pulses|              1.1|                3560.0|                3965.0|                  3775.0|   2018-04-04| 5018.4|
|ChennangiDal| Karnataka|    Bangalore|  Bangalore|Chennangi Dal|Pulses|        

In [None]:
df = newdf.withColumn('Min Price (Rs/Quintal)',F.when(newdf['Min Price (Rs/Quintal)'].isNull(),newdf['avg_min']).otherwise(newdf['Min Price (Rs/Quintal)'])).drop('avg_min')

In [None]:
# rechecking nullcount for Min Price (Rs/Quintal)
nc = df.filter(col('Min Price (Rs/Quintal)').isNull()).count()
nc

0

In [None]:
# for max_price
avg_max = df.groupBy('Commodity').agg(round(F.avg('Max Price (Rs/Quintal)'),2).alias('avg_max'))
avg_max.show()

+------------------+--------+
|         Commodity| avg_max|
+------------------+--------+
|            Papaya| 1962.03|
|PointedgourdParval| 3100.59|
|     TenderCoconut| 8196.02|
|         Thondekai| 1820.63|
|   Pepperungarbled| 44392.8|
|         Pineapple| 2997.87|
|    ChillyCapsicum| 2307.65|
|           Peascod| 2965.25|
|               Tea|16985.72|
|          ChowChow| 1847.12|
|   CinamonDalchini|  3200.0|
|            Turnip|  918.92|
|             Peach| 3070.15|
|       SkinAndHide|   175.0|
|      TamarindSeed| 1912.58|
|               Yam| 2545.28|
|     ChikoosSapota| 2268.62|
|     TubeRoseLoose|   86.73|
|              Plum| 4256.34|
|              Coca|  4400.0|
+------------------+--------+
only showing top 20 rows



In [None]:
# checking avg_max for commodity - ChennangiDal
b = avg_max.where(avg_max["Commodity"] == "ChennangiDal")
b.show()

+------------+-------+
|   Commodity|avg_max|
+------------+-------+
|ChennangiDal|5402.31|
+------------+-------+



In [None]:
newdf2 = df.join(avg_max,on='Commodity',how='left')
newdf2.show()

+------------+----------+-------------+-----------+-------------+------+-----------------+----------------------+----------------------+------------------------+-------------+-------+
|   Commodity|State Name|District Name|Market Name|      Variety| Group|Arrivals (Tonnes)|Min Price (Rs/Quintal)|Max Price (Rs/Quintal)|Modal Price (Rs/Quintal)|Reported Date|avg_max|
+------------+----------+-------------+-----------+-------------+------+-----------------+----------------------+----------------------+------------------------+-------------+-------+
|ChennangiDal|   Gujarat|        Surat|    Songadh|        Other|Pulses|              0.5|                5500.0|                5905.0|                  5705.0|   2017-04-04|5402.31|
|ChennangiDal|   Gujarat|        Surat|    Songadh|   Gram Chapa|Pulses|              1.1|                3560.0|                3965.0|                  3775.0|   2018-04-04|5402.31|
|ChennangiDal| Karnataka|    Bangalore|  Bangalore|Chennangi Dal|Pulses|        

In [None]:
df = newdf2.withColumn('Max Price (Rs/Quintal)',F.when(newdf2['Max Price (Rs/Quintal)'].isNull(),newdf2['avg_max']).otherwise(newdf2['Max Price (Rs/Quintal)'])).drop('avg_max')

In [None]:
nc = df.filter(col('Max Price (Rs/Quintal)').isNull()).count()
nc

0

In [None]:
# for modal price
avg_modal = df.groupBy('Commodity').agg(round(F.avg('Modal Price (Rs/Quintal)'),2).alias('avg_modal'))
avg_modal.show()

+------------------+---------+
|         Commodity|avg_modal|
+------------------+---------+
|            Papaya|  1862.27|
|PointedgourdParval|  2929.41|
|     TenderCoconut|  6077.33|
|         Thondekai|  1665.44|
|   Pepperungarbled| 44027.92|
|         Pineapple|  2689.41|
|    ChillyCapsicum|  1978.54|
|           Peascod|   2655.4|
|               Tea| 16630.16|
|          ChowChow|  1682.46|
|   CinamonDalchini|   3200.0|
|            Turnip|   825.43|
|             Peach|   2635.8|
|       SkinAndHide|    175.0|
|      TamarindSeed|  1817.03|
|               Yam|  2384.02|
|     ChikoosSapota|  1953.48|
|     TubeRoseLoose|    67.76|
|              Plum|  3608.41|
|              Coca|   4400.0|
+------------------+---------+
only showing top 20 rows



In [None]:
# checking avg_modal for commodity - ChennangiDal
c = avg_modal.where(avg_modal["Commodity"] == "ChennangiDal")
c.show()

+------------+---------+
|   Commodity|avg_modal|
+------------+---------+
|ChennangiDal|  5225.89|
+------------+---------+



In [None]:
newdf3 = df.join(avg_modal,on='Commodity',how='left')
newdf3.show()

+------------+----------+-------------+-----------+-------------+------+-----------------+----------------------+----------------------+------------------------+-------------+---------+
|   Commodity|State Name|District Name|Market Name|      Variety| Group|Arrivals (Tonnes)|Min Price (Rs/Quintal)|Max Price (Rs/Quintal)|Modal Price (Rs/Quintal)|Reported Date|avg_modal|
+------------+----------+-------------+-----------+-------------+------+-----------------+----------------------+----------------------+------------------------+-------------+---------+
|ChennangiDal|   Gujarat|        Surat|    Songadh|        Other|Pulses|              0.5|                5500.0|                5905.0|                  5705.0|   2017-04-04|  5225.89|
|ChennangiDal|   Gujarat|        Surat|    Songadh|   Gram Chapa|Pulses|              1.1|                3560.0|                3965.0|                  3775.0|   2018-04-04|  5225.89|
|ChennangiDal| Karnataka|    Bangalore|  Bangalore|Chennangi Dal|Pulse

In [None]:
df = newdf3.withColumn('Modal Price (Rs/Quintal)',F.when(newdf3['Modal Price (Rs/Quintal)'].isNull(),newdf3['avg_modal']).otherwise(newdf3['Modal Price (Rs/Quintal)'])).drop('avg_modal')

In [None]:
nc = df.filter(col('Modal Price (Rs/Quintal)').isNull()).count()
nc

0

In [None]:
# for arrivals
avg_arr = df.groupBy('Commodity').agg(round(F.avg('Arrivals (Tonnes)'),2).alias('avg_arr'))
avg_arr.show()

+------------------+-------+
|         Commodity|avg_arr|
+------------------+-------+
|            Papaya|  13.22|
|PointedgourdParval|   5.82|
|     TenderCoconut| 342.72|
|         Thondekai|   6.21|
|   Pepperungarbled|   1.13|
|         Pineapple|  13.35|
|    ChillyCapsicum|   9.42|
|           Peascod|  12.51|
|               Tea|   4.08|
|          ChowChow|   2.37|
|   CinamonDalchini|   5.75|
|            Turnip|   2.22|
|             Peach|   3.78|
|       SkinAndHide|  120.0|
|      TamarindSeed|  32.77|
|               Yam|   7.35|
|     ChikoosSapota|  13.88|
|     TubeRoseLoose|  94.74|
|              Plum|   8.08|
|              Coca|    2.1|
+------------------+-------+
only showing top 20 rows



In [None]:
# checking avg_arrival for commodity - ChennangiDal
d = avg_arr.where(avg_arr["Commodity"] == "ChennangiDal")
d.show()

+------------+-------+
|   Commodity|avg_arr|
+------------+-------+
|ChennangiDal|  24.31|
+------------+-------+



In [None]:
newdf4 = df.join(avg_arr,on='Commodity',how='left')
newdf4.show()

+------------+----------+-------------+-----------+-------------+------+-----------------+----------------------+----------------------+------------------------+-------------+-------+
|   Commodity|State Name|District Name|Market Name|      Variety| Group|Arrivals (Tonnes)|Min Price (Rs/Quintal)|Max Price (Rs/Quintal)|Modal Price (Rs/Quintal)|Reported Date|avg_arr|
+------------+----------+-------------+-----------+-------------+------+-----------------+----------------------+----------------------+------------------------+-------------+-------+
|ChennangiDal|   Gujarat|        Surat|    Songadh|        Other|Pulses|              0.5|                5500.0|                5905.0|                  5705.0|   2017-04-04|  24.31|
|ChennangiDal|   Gujarat|        Surat|    Songadh|   Gram Chapa|Pulses|              1.1|                3560.0|                3965.0|                  3775.0|   2018-04-04|  24.31|
|ChennangiDal| Karnataka|    Bangalore|  Bangalore|Chennangi Dal|Pulses|        

In [None]:
df = newdf4.withColumn('Arrivals (Tonnes)',F.when(newdf4['Arrivals (Tonnes)'].isNull(),newdf4['avg_arr']).otherwise(newdf4['Arrivals (Tonnes)'])).drop('avg_arr')

In [None]:
nc = df.filter(col('Arrivals (Tonnes)').isNull()).count()
nc

483

In [None]:
# there are 483 null values in arrivals column
nc_arr = df.filter(col("Arrivals (Tonnes)").isNull())
nc_arr.show()

+---------+------------+-------------+--------------------+----------+-------+-----------------+----------------------+----------------------+------------------------+-------------+
|Commodity|  State Name|District Name|         Market Name|   Variety|  Group|Arrivals (Tonnes)|Min Price (Rs/Quintal)|Max Price (Rs/Quintal)|Modal Price (Rs/Quintal)|Reported Date|
+---------+------------+-------------+--------------------+----------+-------+-----------------+----------------------+----------------------+------------------------+-------------+
| RoseTata|NCT of Delhi|        Delhi|Flower Market, Ca...|Rose(Tata)|Flowers|             NULL|                   1.5|                   3.5|                     2.0|   2010-01-16|
| RoseTata|NCT of Delhi|        Delhi|Flower Market, Ca...|Rose(Tata)|Flowers|             NULL|                  1.75|                  3.75|                     2.0|   2010-01-19|
| RoseTata|NCT of Delhi|        Delhi|Flower Market,Gaz...|     Other|Flowers|            

In [None]:
# since we are getting 2 commodities that are having all null values
e = nc_arr.groupby("Commodity").count()
e.show()

+---------+-----+
|Commodity|count|
+---------+-----+
|  Jarbara|  479|
| RoseTata|    4|
+---------+-----+



In [None]:
# avg for each commodity
grp_arr = df.groupby("Commodity").agg(round(avg("Arrivals (Tonnes)"),2).alias("avg_arrival"))
grp_arr.show()

+------------------+-----------+
|         Commodity|avg_arrival|
+------------------+-----------+
|            Papaya|      13.22|
|PointedgourdParval|       5.82|
|     TenderCoconut|     342.72|
|         Thondekai|       6.21|
|   Pepperungarbled|       1.13|
|         Pineapple|      13.35|
|    ChillyCapsicum|       9.42|
|           Peascod|      12.51|
|               Tea|       4.08|
|          ChowChow|       2.37|
|   CinamonDalchini|       5.75|
|            Turnip|       2.22|
|             Peach|       3.78|
|       SkinAndHide|      120.0|
|      TamarindSeed|      32.77|
|               Yam|       7.35|
|     ChikoosSapota|      13.88|
|     TubeRoseLoose|      94.74|
|              Plum|       8.08|
|              Coca|        2.1|
+------------------+-----------+
only showing top 20 rows



In [None]:
# sum of all avg 
from pyspark.sql.functions import sum
tot_arr = grp_arr.agg(round(sum("avg_arrival"),2)).collect()[0][0]
tot_arrs =  float(tot_arr)
tot_arrs

11308.75

In [None]:
# percentage of avg_arrivals 
per_avg_arr = grp_arr.withColumn('percentage',(col('avg_arrival')/tot_arrs)*100)
per_avg_arr.show()

+------------------+-----------+--------------------+
|         Commodity|avg_arrival|          percentage|
+------------------+-----------+--------------------+
|            Papaya|      13.22| 0.11690063004310822|
|PointedgourdParval|       5.82|0.051464573891897875|
|     TenderCoconut|     342.72|  3.0305736708301096|
|         Thondekai|       6.21| 0.05491323090527246|
|   Pepperungarbled|       1.13|0.009992262628495634|
|         Pineapple|      13.35| 0.11805018238089975|
|    ChillyCapsicum|       9.42| 0.08329833093843263|
|           Peascod|      12.51|  0.1106223057367083|
|               Tea|       4.08| 0.03607825798607273|
|          ChowChow|       2.37| 0.02095722338896872|
|   CinamonDalchini|       5.75| 0.05084558417154858|
|            Turnip|       2.22|0.019630816845363105|
|             Peach|       3.78|  0.0334254448988615|
|       SkinAndHide|      120.0|  1.0611252348844922|
|      TamarindSeed|      32.77| 0.28977561622637343|
|               Yam|       7

In [None]:
from pyspark.sql.functions import round
total_sum = per_avg_arr.agg(round(sum(col("percentage")),2)).collect()[0][0]

In [None]:
total_sum

100.0

In [None]:
# commodity with max percentage arrivals
from pyspark.sql.functions import max
max_value = per_avg_arr.agg(max(col("percentage"))).collect()[0][0]
max_value

6.146037360450978

In [None]:
# arrival percentage for banana
x = per_avg_arr.where(col("Commodity") == "Banana")
x.show()

+---------+-----------+-------------------+
|Commodity|avg_arrival|         percentage|
+---------+-----------+-------------------+
|   Banana|      23.77|0.21019122361003645|
+---------+-----------+-------------------+



In [None]:
# percentage arrivals for Jarbara 
e = per_avg_arr.where(col("Commodity") == "Jarbara")
e.show()

+---------+-----------+----------+
|Commodity|avg_arrival|percentage|
+---------+-----------+----------+
|  Jarbara|       NULL|      NULL|
+---------+-----------+----------+



In [None]:
# percentage arrivals for RoseTata 
e = per_avg_arr.where(col("Commodity") == "RoseTata")
e.show()

+---------+-----------+----------+
|Commodity|avg_arrival|percentage|
+---------+-----------+----------+
| RoseTata|       NULL|      NULL|
+---------+-----------+----------+



In [None]:
# dropping both the commodities
df = df.filter((df['Commodity'] != "Jarbara") & (df['Commodity'] != "RoseTata"))

In [None]:
# count of total rows 
df.count()

51729422

In [None]:
# checking again null values 
for colname in df.columns:
    nullcount = df.filter(col(colname).isNull()).count()
    print(f"{colname} : {nullcount}")

Commodity : 0
State Name : 0
District Name : 0
Market Name : 0
Variety : 0
Group : 0
Arrivals (Tonnes) : 0
Min Price (Rs/Quintal) : 0
Max Price (Rs/Quintal) : 0
Modal Price (Rs/Quintal) : 0
Reported Date : 0


In [None]:
df.printSchema() 

root
 |-- Commodity: string (nullable = true)
 |-- State Name: string (nullable = true)
 |-- District Name: string (nullable = true)
 |-- Market Name: string (nullable = true)
 |-- Variety: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Arrivals (Tonnes): double (nullable = true)
 |-- Min Price (Rs/Quintal): double (nullable = true)
 |-- Max Price (Rs/Quintal): double (nullable = true)
 |-- Modal Price (Rs/Quintal): double (nullable = true)
 |-- Reported Date: date (nullable = true)



In [None]:
df.show(n=10)

+------------+----------+-------------+-----------+-------------+------+-------------------+----------------------+----------------------+------------------------+-------------+
|   Commodity|State Name|District Name|Market Name|      Variety| Group|  Arrivals (Tonnes)|Min Price (Rs/Quintal)|Max Price (Rs/Quintal)|Modal Price (Rs/Quintal)|Reported Date|
+------------+----------+-------------+-----------+-------------+------+-------------------+----------------------+----------------------+------------------------+-------------+
|ChennangiDal|   Gujarat|        Surat|    Songadh|        Other|Pulses|                0.5|                5500.0|                5905.0|                  5705.0|   2017-04-04|
|ChennangiDal|   Gujarat|        Surat|    Songadh|   Gram Chapa|Pulses|  1.100000023841858|                3560.0|                3965.0|                  3775.0|   2018-04-04|
|ChennangiDal| Karnataka|    Bangalore|  Bangalore|Chennangi Dal|Pulses|               17.0|                46

In [None]:
# counting duplicate values 
from pyspark.sql import Window
from pyspark.sql.functions import count
w = Window.partitionBy(df.columns)
ddf = df.withColumn("duplicate_count", count("*").over(w))
duplicates_df = ddf.where(ddf["duplicate_count"] > 1)

In [None]:
duplicates_df.count()

5150150

In [None]:
# m = df.dropDuplicates()

In [None]:
# m.count()

In [None]:
df.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/projectteam14/Final_Data")

In [None]:
df

DataFrame[Commodity: string, State Name: string, District Name: string, Market Name: string, Variety: string, Group: string, Arrivals (Tonnes): double, Min Price (Rs/Quintal): double, Max Price (Rs/Quintal): double, Modal Price (Rs/Quintal): double, Reported Date: date]

In [None]:
display(df)

Commodity,State Name,District Name,Market Name,Variety,Group,Arrivals (Tonnes),Min Price (Rs/Quintal),Max Price (Rs/Quintal),Modal Price (Rs/Quintal),Reported Date
ChikoosSapota,Goa,North Goa,Mapusa,Other,Fruits,0.3300000131130218,600.0,800.0,700.0,2003-04-02
ChikoosSapota,Gujarat,Navsari,Chikhali,Sapota,Fruits,79.0,525.0,800.0,635.0,2003-04-04
ChikoosSapota,Gujarat,Navsari,Chikhali,Sapota,Fruits,13.88,125.0,500.0,250.0,2003-04-04
ChikoosSapota,Gujarat,Navsari,Chikhali,Sapota,Fruits,70.0,550.0,745.0,600.0,2003-04-03
ChikoosSapota,Gujarat,Navsari,Chikhali,Sapota,Fruits,13.88,100.0,525.0,200.0,2003-04-03
ChikoosSapota,Gujarat,Navsari,Chikhali,Sapota,Fruits,65.0,450.0,660.0,525.0,2003-04-02
ChikoosSapota,Gujarat,Navsari,Chikhali,Sapota,Fruits,13.88,100.0,450.0,105.0,2003-04-02
ChikoosSapota,Haryana,Faridabad,Faridabad,Other,Fruits,0.2000000029802322,1000.0,1200.0,0.0,2003-04-03
ChikoosSapota,Haryana,Faridabad,Faridabad,Other,Fruits,1.2000000476837158,800.0,1000.0,0.0,2003-04-01
ChikoosSapota,Haryana,Sirsa,Sirsa,Other,Fruits,1.5,650.0,1000.0,900.0,2003-04-04
