In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = (SparkSession
         .builder
         .appName("Task1")
          .getOrCreate())

In [3]:
spark

In [4]:
df =spark.read.option("header",True).json("MOCK_DATA.json")

In [5]:
df.show()

+--------------------+-------------+-----------------+-------------+--------------------+---------------+---------------+---------------+-----------------------+------------+
|     _corrupt_record|billing_class|billing_code_type|billing_codee|         description|expiration_date|negotiated_rate|negotiated_type|negotiation_arrangement|service_code|
+--------------------+-------------+-----------------+-------------+--------------------+---------------+---------------+---------------+-----------------------+------------+
|[{"billing_class"...|         null|             null|         null|                null|           null|           null|           null|                   null|        null|
|                null|          NVA|             null|   54868-6381|Occup of pk-up/va...|     2023/10/27|          $1.89|             CO|             49288-0290|          ID|
|                null|          JEQ|             SNJK|    55315-329|Posterior subluxa...|     2023/03/22|          $6.07|    

In [6]:
dropped_df = df.drop("description","expiration_date")

In [7]:
dropped_df.show()

+--------------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|     _corrupt_record|billing_class|billing_code_type|billing_codee|negotiated_rate|negotiated_type|negotiation_arrangement|service_code|
+--------------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|[{"billing_class"...|         null|             null|         null|           null|           null|                   null|        null|
|                null|          NVA|             null|   54868-6381|          $1.89|             CO|             49288-0290|          ID|
|                null|          JEQ|             SNJK|    55315-329|          $6.07|             BR|              10544-105|          CN|
|                null|          SSO|             SNLO|    13537-015|          $4.78|             BR|              53489-536|          CN|
|                null|          CK

## Removing hypen from bill

In [8]:
from pyspark.sql.functions import regexp_replace,col,expr,monotonically_increasing_id,hash,lit
hypenrem_df=dropped_df.withColumn("billing_codee",regexp_replace(col("billing_codee"),"-",""))\
                     .withColumn("negotiation_arrangement",regexp_replace(col("negotiation_arrangement"),"-",""))\
                      .withColumn('negotiated_rate', expr("replace(negotiated_rate,'$','')"))


# hyphenrem_df = dropped_df.withColumn("billing_codee", translate(col("billing_codee"), "-", ""))\
#                          .withColumn("negotiation_arrangement", translate(col("negotiation_arrangement"), "-", ""))\
#                          .withColumn("negotiated_rate", translate(col("negotiated_rate"), "$", ""))  # No escape needed

# hypenrem_df = dropped_df.withColumn('billing_codee', expr("replace(billing_codee,'-','')"))\
#         .withColumn('negotiation_arrangement', expr("replace(negotiation_arrangement,'-','')"))\
#         .withColumn('negotiated_rate', expr("replace(negotiated_rate,'$','')"))
                   

In [9]:
hypenrem_df.show()

+--------------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|     _corrupt_record|billing_class|billing_code_type|billing_codee|negotiated_rate|negotiated_type|negotiation_arrangement|service_code|
+--------------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|[{"billing_class"...|         null|             null|         null|           null|           null|                   null|        null|
|                null|          NVA|             null|    548686381|           1.89|             CO|              492880290|          ID|
|                null|          JEQ|             SNJK|     55315329|           6.07|             BR|               10544105|          CN|
|                null|          SSO|             SNLO|     13537015|           4.78|             BR|               53489536|          CN|
|                null|          CK

In [10]:
hashing_df= hypenrem_df.withColumn("service_code",hash("service_code"))
hashing_df.show()



+--------------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|     _corrupt_record|billing_class|billing_code_type|billing_codee|negotiated_rate|negotiated_type|negotiation_arrangement|service_code|
+--------------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|[{"billing_class"...|         null|             null|         null|           null|           null|                   null|          42|
|                null|          NVA|             null|    548686381|           1.89|             CO|              492880290|  -214147340|
|                null|          JEQ|             SNJK|     55315329|           6.07|             BR|               10544105|   446854828|
|                null|          SSO|             SNLO|     13537015|           4.78|             BR|               53489536|   446854828|
|                null|          CK

## Remove the row if 'billing_code' is null


In [11]:
rmvnull_df = hashing_df.dropna(subset=["billing_codee"])
rmvnull_df.show()

+---------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|_corrupt_record|billing_class|billing_code_type|billing_codee|negotiated_rate|negotiated_type|negotiation_arrangement|service_code|
+---------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|           null|          NVA|             null|    548686381|           1.89|             CO|              492880290|  -214147340|
|           null|          JEQ|             SNJK|     55315329|           6.07|             BR|               10544105|   446854828|
|           null|          SSO|             SNLO|     13537015|           4.78|             BR|               53489536|   446854828|
|           null|          CKR|             null|     42508138|           4.35|             US|               59572425|   519803259|
|           null|          CXH|             CYHC|     52533120|      

## Replace all the null values in billing_class to ‘I’


In [12]:
repna_df=rmvnull_df.fillna({"billing_class":'I'})
repna_df.show()

+---------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|_corrupt_record|billing_class|billing_code_type|billing_codee|negotiated_rate|negotiated_type|negotiation_arrangement|service_code|
+---------------+-------------+-----------------+-------------+---------------+---------------+-----------------------+------------+
|           null|          NVA|             null|    548686381|           1.89|             CO|              492880290|  -214147340|
|           null|          JEQ|             SNJK|     55315329|           6.07|             BR|               10544105|   446854828|
|           null|          SSO|             SNLO|     13537015|           4.78|             BR|               53489536|   446854828|
|           null|          CKR|             null|     42508138|           4.35|             US|               59572425|   519803259|
|           null|          CXH|             CYHC|     52533120|      

### Rename all the column as given. billing_class>>bCls  , billing_code>>bC, billing_code_type>> bCT , negotiated_rate>> negR, negotiated_type>> negT, negotiation_arrangements>> negA, service_code>> poSH

In [13]:
renamecol_df =  repna_df.withColumnRenamed('billing_class','bCls')\
                        .withColumnRenamed('billing_codee','bC')\
                        .withColumnRenamed('billing_code_type','bCT')\
                        .withColumnRenamed('negotiated_rate','negR')\
                        .withColumnRenamed('negotiated_type','negT')\
                        .withColumnRenamed('negotiation_arrangement','negA')\
                        .withColumnRenamed('service_code','poSH')

renamecol_df.show()

+---------------+----+----+---------+----+----+---------+-----------+
|_corrupt_record|bCls| bCT|       bC|negR|negT|     negA|       poSH|
+---------------+----+----+---------+----+----+---------+-----------+
|           null| NVA|null|548686381|1.89|  CO|492880290| -214147340|
|           null| JEQ|SNJK| 55315329|6.07|  BR| 10544105|  446854828|
|           null| SSO|SNLO| 13537015|4.78|  BR| 53489536|  446854828|
|           null| CKR|null| 42508138|4.35|  US| 59572425|  519803259|
|           null| CXH|CYHC| 52533120|1.35|  CA| 41250871| -214147340|
|           null| KTT|EFKT| 36800952|7.52|  FI| 59886410|  446854828|
|           null|   I|null| 11344999|2.94|  JP| 76509151|  446854828|
|           null| DAL|null|369871207|4.07|null| 11410020| 1062621904|
|           null| GDA|null| 76138106|9.05|  CF| 65954538| 1817792724|
|           null| AEX|KAEX| 53329809|3.61|  US| 50991216|  175481947|
|           null|   I|SPHI| 04960760|4.79|  PE| 49884641|-1524333386|
|           null| DU

### Add a whole new column named billing_code_modifier and rename it to 'mdH'

In [17]:
add_column = renamecol_df.withColumn("billing_code_modifier ",lit(1426636 ))
add_column.show()

+---------------+----+----+---------+----+----+---------+-----------+----------------------+
|_corrupt_record|bCls| bCT|       bC|negR|negT|     negA|       poSH|billing_code_modifier |
+---------------+----+----+---------+----+----+---------+-----------+----------------------+
|           null| NVA|null|548686381|1.89|  CO|492880290| -214147340|               1426636|
|           null| JEQ|SNJK| 55315329|6.07|  BR| 10544105|  446854828|               1426636|
|           null| SSO|SNLO| 13537015|4.78|  BR| 53489536|  446854828|               1426636|
|           null| CKR|null| 42508138|4.35|  US| 59572425|  519803259|               1426636|
|           null| CXH|CYHC| 52533120|1.35|  CA| 41250871| -214147340|               1426636|
|           null| KTT|EFKT| 36800952|7.52|  FI| 59886410|  446854828|               1426636|
|           null|   I|null| 11344999|2.94|  JP| 76509151|  446854828|               1426636|
|           null| DAL|null|369871207|4.07|null| 11410020| 1062621904| 

In [18]:
renamecol1_df=add_column.withColumnRenamed('billing_code_modifier ','mdH')
renamecol1_df.show()

+---------------+----+----+---------+----+----+---------+-----------+-------+
|_corrupt_record|bCls| bCT|       bC|negR|negT|     negA|       poSH|    mdH|
+---------------+----+----+---------+----+----+---------+-----------+-------+
|           null| NVA|null|548686381|1.89|  CO|492880290| -214147340|1426636|
|           null| JEQ|SNJK| 55315329|6.07|  BR| 10544105|  446854828|1426636|
|           null| SSO|SNLO| 13537015|4.78|  BR| 53489536|  446854828|1426636|
|           null| CKR|null| 42508138|4.35|  US| 59572425|  519803259|1426636|
|           null| CXH|CYHC| 52533120|1.35|  CA| 41250871| -214147340|1426636|
|           null| KTT|EFKT| 36800952|7.52|  FI| 59886410|  446854828|1426636|
|           null|   I|null| 11344999|2.94|  JP| 76509151|  446854828|1426636|
|           null| DAL|null|369871207|4.07|null| 11410020| 1062621904|1426636|
|           null| GDA|null| 76138106|9.05|  CF| 65954538| 1817792724|1426636|
|           null| AEX|KAEX| 53329809|3.61|  US| 50991216|  17548

In [19]:
hashing1_df = renamecol1_df.withColumn('mdH',hash('mdH'))
hashing1_df.show()

+---------------+----+----+---------+----+----+---------+-----------+---------+
|_corrupt_record|bCls| bCT|       bC|negR|negT|     negA|       poSH|      mdH|
+---------------+----+----+---------+----+----+---------+-----------+---------+
|           null| NVA|null|548686381|1.89|  CO|492880290| -214147340|678617635|
|           null| JEQ|SNJK| 55315329|6.07|  BR| 10544105|  446854828|678617635|
|           null| SSO|SNLO| 13537015|4.78|  BR| 53489536|  446854828|678617635|
|           null| CKR|null| 42508138|4.35|  US| 59572425|  519803259|678617635|
|           null| CXH|CYHC| 52533120|1.35|  CA| 41250871| -214147340|678617635|
|           null| KTT|EFKT| 36800952|7.52|  FI| 59886410|  446854828|678617635|
|           null|   I|null| 11344999|2.94|  JP| 76509151|  446854828|678617635|
|           null| DAL|null|369871207|4.07|null| 11410020| 1062621904|678617635|
|           null| GDA|null| 76138106|9.05|  CF| 65954538| 1817792724|678617635|
|           null| AEX|KAEX| 53329809|3.6