In [1]:
import findspark
findspark.init('/usr/spark2.4.3')

In [2]:
from pyspark.sql import SparkSession

In [3]:
ss = SparkSession.builder.appName("test").getOrCreate()

In [4]:
df = ss.read.csv(<path>)

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
from datetime import datetime
# import streamlit as st

In [6]:
# Creating table products
# File location and type
file_location = "/user/kolpurath6035/registered_companies.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = ss.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [7]:
from pyspark.sql import functions as F
df = df.withColumn("date_of_registration",F.date_format(F.to_date("date_of_registration", "dd-MM-yyyy"), "yyyy-MM-dd"))
df = df.withColumn("latest_year_annual_return",F.date_format(F.to_date("latest_year_annual_return", "dd-MM-yyyy"), "yyyy-MM-dd"))

df.select("date_of_registration").printSchema()

root
 |-- date_of_registration: string (nullable = true)



In [8]:
df = df.withColumn("PAIDUP_CAPITAL",df.PAIDUP_CAPITAL.cast(IntegerType()))

In [10]:
# 1. Based on the class display the number of companies
df1 = df.select("corporate_identification_number","company_class")
df1 = df1.groupBy("company_class").count()
df1.show()

+--------------------+-------+
|       company_class|  count|
+--------------------+-------+
|              Public| 137612|
|                null|   5078|
|             Private|1819264|
|Private(One Perso...|  30216|
+--------------------+-------+



In [11]:
df1.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [12]:
# 2. list the number of companies that had been registered in each decade
@udf(returnType=StringType())
def find_decade(year):
    if year is None:
        return None
    else:
        dec1 = (year - (year % 10))
        dec2 = dec1+9
        dec = (f"{dec1} - {dec2}")
        if(dec1<1000 or dec1>2023):
            return "invalid_year"
        else:
            return dec
df2 = df.select("corporate_identification_number","company_name",year("date_of_registration").alias("year"))
df2 = df2.withColumn("decade", find_decade(col("year")))
decade_df = df2
df2 = df2.groupBy("decade").agg(count("*").alias("no_of_companies")).orderBy(col("decade"))
df2.show()

+------------+---------------+
|      decade|no_of_companies|
+------------+---------------+
|        null|           2525|
| 1850 - 1859|              1|
| 1860 - 1869|              3|
| 1870 - 1879|             24|
| 1880 - 1889|             36|
| 1890 - 1899|             57|
| 1900 - 1909|            887|
| 1910 - 1919|            934|
| 1920 - 1929|           1827|
| 1930 - 1939|           3378|
| 1940 - 1949|          10808|
| 1950 - 1959|           9475|
| 1960 - 1969|          11818|
| 1970 - 1979|          30666|
| 1980 - 1989|         143785|
| 1990 - 1999|         344661|
| 2000 - 2009|         439820|
| 2010 - 2019|         978716|
| 2020 - 2029|          12747|
|invalid_year|              2|
+------------+---------------+



In [13]:
df2.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [14]:
# 3. find top 5 companies with highest paid up capital as a list in each leap year after 2000
@udf(returnType=StringType())
def check_year(year):
    if year is None:
        return None
    else:
        if(year<1000 or year>2023):
                return "invalid_year"
        elif (year>=2000):
            if (year%4==0):
                return "yes"
            else:
                return "no"
        else:
            return "no"
df3 = df.select("corporate_identification_number","company_name",year("date_of_registration").alias("year"),"paidup_capital")
df3 = df3.withColumn("check",check_year("year"))
df3 = df3.filter(col("check")=="yes").orderBy("corporate_identification_number","company_name","year")
df3 = df3.withColumn("rank",dense_rank().over(Window.partitionBy("year").orderBy(col("paidup_capital").desc())))
df3 = df3.filter(col("rank")<=5).orderBy("year","rank")
df3.show(500)

+-------------------------------+--------------------+----+--------------+-----+----+
|corporate_identification_number|        company_name|year|paidup_capital|check|rank|
+-------------------------------+--------------------+----+--------------+-----+----+
|           U74140MH2000PLC12...|RELOGISTICS INFRA...|2000|    2101913000|  yes|   1|
|           U85110KA2000PLC02...|NANDI ECONOMIC CO...|2000|    2090466920|  yes|   2|
|           L72200MH2000PLC13...|MW UNITEXX LIMITE...|2000|    2087292760|  yes|   3|
|           L85110KA2000PLC02...|NARAYANA HRUDAYAL...|2000|    2043608040|  yes|   4|
|           U45200DL2000PLC15...|PIPAVAV RAILWAY C...|2000|    1960000200|  yes|   5|
|           U74210PN2004PTC14...|EON HADAPSAR INFR...|2004|    2131500000|  yes|   1|
|           U74140KA2004PTC03...|SANYO BPL PRIVATE...|2004|    2089333780|  yes|   2|
|           U45200TG2004PLC04...|GKC PROJECTS LIMI...|2004|    2050787020|  yes|   3|
|           U28910MH2004PLC20...|FIRST FORGE LIMIT...|

In [15]:
df3.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

KeyboardInterrupt: 

In [21]:
# 4. find top 5 companies that has highest paid up capital in each state
df4 = df.select("corporate_identification_number","company_name","registered_state","paidup_capital")
df4 = df4.withColumn("rank",dense_rank().over(Window.partitionBy("registered_state").orderBy(col("paidup_capital").desc())))
df4 = df4.filter(col("rank")<=1).orderBy("registered_state","rank")
df4.show(500)

+-------------------------------+--------------------+--------------------+--------------+----+
|corporate_identification_number|        company_name|    registered_state|paidup_capital|rank|
+-------------------------------+--------------------+--------------------+--------------+----+
|           U74999AN1988SGC00...|ANDAMAN AND NICOB...|Andaman and Nicob...|     109700600|   1|
|           U15209AP2006PTC04...|SRI VIJAYA VISAKH...|      Andhra Pradesh|    2129403500|   1|
|           U40101AR2013PLC00...|ARUNACHAL HYDRO P...|   Arunachal Pradesh|    1643000000|   1|
|           L23209AS1974GOI00...|BONGAIGAON REFINE...|               Assam|    1998179000|   1|
|           U40100BR1982SGC00...|BIHAR STATE HYDRO...|               Bihar|     990400000|   1|
|           U63010CH2013GOI03...|PUNJAB LOGISTICS ...|          Chandigarh|    1985150000|   1|
|           U13100CT2008GOI02...|NMDC-CMDC LIMITED   |         Chattisgarh|    1928377180|   1|
|           L74999DN1987PLC00...|K-LIFES

In [None]:
df4.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [None]:
# 5. which state has highest companies registered
df5 = df.select("corporate_identification_number","company_name","registered_state")
df5 = df5.groupBy("registered_state").agg(count("*").alias("num_of_companies")).orderBy(col("num_of_companies").desc())
df5.show(1)

In [None]:
df5.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [None]:
# 6. find the year on which each state has their maximum registration
df6 = df.select("corporate_identification_number","company_name","registered_state",year("date_of_registration").alias("year"))
df6 = df6.groupBy("registered_state","year").agg(count("*").alias("no_of_companies"))
df6 = df6.withColumn("rank",dense_rank().over(Window.partitionBy("registered_state").orderBy(col("no_of_companies").desc())))
df6 = df6.filter(col("rank")==1).orderBy(col("registered_state"))
df6.show()

In [None]:
df6.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [22]:
# 7. find the sector that is most common in each state
df7 = df.select("corporate_identification_number","company_name","registered_state","company_class")
df7 = df7.groupBy("registered_state","company_class").agg(count("*").alias("no_of_companies"))
df7 = df7.withColumn("rank",dense_rank().over(Window.partitionBy("registered_state").orderBy(col("no_of_companies").desc())))
df7 = df7.filter(col("rank")==1).orderBy(col("registered_state"))
df7.show()


+--------------------+--------------------+---------------+----+
|    registered_state|       company_class|no_of_companies|rank|
+--------------------+--------------------+---------------+----+
|            Nagaland|             Private|            560|   1|
|            Nagaland|              Public|             49|   2|
|            Nagaland|Private(One Perso...|             11|   3|
|           Karnataka|             Private|         117309|   1|
|           Karnataka|              Public|           4811|   2|
|           Karnataka|Private(One Perso...|           3191|   3|
|           Karnataka|                null|            468|   4|
|Dadra and Nagra H...|             Private|            442|   1|
|Dadra and Nagra H...|              Public|            104|   2|
|Dadra and Nagra H...|Private(One Perso...|              4|   3|
|              Kerala|             Private|          51449|   1|
|              Kerala|              Public|           3876|   2|
|              Kerala|Pri

In [23]:
df7.rdd.coalesce(1).toDF().write.option("header",True).mode("overwrite").csv(<path>)

In [None]:
df8.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [18]:
# 9. list the companies that have been recently enrolled in each state
df9 = df.select("corporate_identification_number","company_name","registered_state",col("date_of_registration").alias("date"))
df9 = df9.withColumn("rank",dense_rank().over(Window.partitionBy("registered_state").orderBy(col("date").desc())))
df9 = df9.filter(col("rank")<=15).orderBy("registered_state","rank")
df9.show()

+-------------------------------+--------------------+--------------------+----------+----+
|corporate_identification_number|        company_name|    registered_state|      date|rank|
+-------------------------------+--------------------+--------------------+----------+----+
|           U51909AN2020PTC00...|ANDAMAN RUPSHA MA...|Andaman and Nicob...|2020-01-13|   1|
|           U92410AN2020PTC00...|LILIUM ADVENTURE ...|Andaman and Nicob...|2020-01-03|   2|
|           U74999AN2019PTC00...|TAURINE FILMS AND...|Andaman and Nicob...|2019-12-20|   3|
|           U70109AN2019PTC00...|KMS DREAM CREATIO...|Andaman and Nicob...|2019-12-16|   4|
|           U60221AN2019PTC00...|DE WHALEN PARADIS...|Andaman and Nicob...|2019-12-06|   5|
|           U63031AN2019PTC00...|INFO INDIA TOUR A...|Andaman and Nicob...|2019-11-21|   6|
|           U63031AN2019PTC00...|PARWATI MULTI-SEC...|Andaman and Nicob...|2019-11-14|   7|
|           U26990AN2019PTC00...|MITTAL ROOFING PR...|Andaman and Nicob...|2019-

In [19]:
df9.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [None]:
# 10. find the count of companies per company_status
df10 = df.select("corporate_identification_number","company_name","company_status")
df10 = df10.groupBy("company_status").agg(count("*").alias("no_of_companies")).orderBy("company_status")
df10.show()

In [None]:
df10.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [None]:
# 11. find the top 2 companies per principal business activity in 19th century
df11 = df.select("corporate_identification_number","company_name","paidup_capital",year("date_of_registration").alias("year"),"PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN")
df11 = df11.filter((col("year")>1800) & (col("year")<1900))
df11 = df11.withColumn("rank",dense_rank().over(Window.partitionBy("PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN").orderBy(col("paidup_capital").desc())))
df11 = df11.filter(col("rank")<=2).orderBy("PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN","rank")
df11.show(500)

In [None]:
df11.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [None]:
# 12. find the company with higest paidup capital in each decade
df12 = decade_df.select("corporate_identification_number", "decade")\
.join(df.select("corporate_identification_number","company_name","paidup_capital"),["corporate_identification_number"],"inner")
df12_2 = df12.groupBy("decade").agg(max(col("paidup_capital")).alias("paidup_capital"))
df12 = df12.select("corporate_identification_number", "company_name", "paidup_capital", "decade") \
    .join(df12_2.select("*"), ["paidup_capital", "decade"], "inner").orderBy("decade")
df12.show()

In [None]:
df12.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv(<path>)

In [None]:
df.columns