In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark=SparkSession.builder.appName("Dunhumbby Practise").getOrCreate()
rawDF=spark.sql("select * from aadhaar_data_csv")

from pyspark.sql.functions import concat_ws, date_format, lit, unix_timestamp, to_date
from datetime import datetime
from pyspark.sql import functions as f

columnList=["date","registrar","private_agency","state","district","sub_district","pin","gender","age","aadhaar_generated","rejected","mobile_number","email_id"]
inputDF=rawDF.rdd.toDF(columnList)
inputDF=inputDF.withColumn("date", to_date(unix_timestamp(inputDF["date"],"yyyyMMdd").cast("timestamp")))
inputDF.persist()


In [2]:
#show number of registrars
inputDF.groupBy(inputDF["registrar"]).count().show(3)

#Show Number of district, sub-district in each state
inputDF.groupBy(inputDF["state"]).agg(f.countDistinct("district").alias("no_of_districts"), f.countDistinct("sub_district").alias("no_of_sub_districts")).show()

In [3]:
#show total number of male population in each state
maleDF=inputDF.filter(inputDF["gender"]=="M")
maleDF.groupBy(inputDF["state"]).agg(f.count("gender").alias("Male_Count")).show()

In [4]:
#Count total number of private agency in each state
privateDF=inputDF.filter("private_agency is not null")
agencyByState=inputDF.groupBy(inputDF["state"],inputDF["private_agency"]).count()
agencyByState.show(3)


from pyspark.sql.functions import col
#Show most active private agency in each state
df=inputDF.groupBy(inputDF["state"],inputDF["private_agency"]).count()
maxAgencyDF=df.groupBy(col("state")).agg(f.max(col("count")).alias("count"))
mostActiveAgency=maxAgencyDF.alias('a').join(agencyByState.alias('b'), (col("a.state")==col("b.state")) & (col("b.count")==col("a.count"))).select("a.state","private_agency","a.count")
mostActiveAgency.show()
#count unique private agency in each state

In [5]:
totalAadharDF=inputDF.groupBy(inputDF["state"]).agg(f.sum("aadhaar_generated").alias("total_number"))
totalAadharDF.orderBy(totalAadharDF["total_number"].desc()).show(3, False)

In [6]:
inputDF.select("pin").distinct().count()

inputDF.filter((inputDF["state"].contains("Uttar Pradesh")) | (inputDF["state"].contains("Maharashtra"))).groupBy(inputDF["state"]).agg(f.count("rejected")).show()

In [7]:
maleDF=inputDF.filter(inputDF["gender"]=="M").groupBy(inputDF["state"]).agg(f.count("aadhaar_generated").alias("male aadhar"))
totalDF=inputDF.groupBy(inputDF["state"]).agg(f.count("aadhaar_generated").alias("total aadhar"))
joinDF=totalDF.join(maleDF, totalDF["state"]==maleDF["state"]).select(maleDF["state"],"total aadhar","male aadhar")
joinWithPercent=joinDF.withColumn("percent Data", lit((joinDF["male aadhar"]/joinDF["total aadhar"])*100))
top3DF=joinWithPercent.orderBy(joinWithPercent["percent Data"].desc()).limit(3)
top3DF.persist()

In [8]:
stateList=top3DF.select("state").rdd.map(list).collect()
#femaleDF=inputDF.filter(inputDF["state"].isin(stateList) & inputDF["gender"]=="F")
#femaleDF.show()
