In [76]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract, isnan, when, count, regexp_replace, split, concat_ws
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
import pydeequ
import sys
import os

In [77]:
def init_spark_session():
    spark = (SparkSession
        .builder
        .config("spark.jars.packages", pydeequ.deequ_maven_coord)
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
        .getOrCreate())

    return spark

def read_file_from_args():
  print("Args: ", sys.argv[1:])
  filepath = ".\\data\\{}".format(sys.argv[1])
  return filepath


def data_cleaning(df_init):
  df_init 


### Define Schema

In [78]:
schema = StructType([ \
    StructField("url", StringType(), True),
    StructField("address", StringType(), True),
    StructField("name", StringType(), False),
    StructField("rate", StringType(), True),
    StructField("votes", IntegerType(), True),
    StructField("phone", IntegerType(), False),
    StructField("location", StringType(), False),
    StructField("rest_type", StringType(), True),
    StructField("dish_liked", ArrayType(StringType()), True),
    StructField("cuisines", StringType(), True),
    StructField("reviews_list", ArrayType(StringType()), True)
])

In [79]:
if __name__ == "__main__":
  
  print("Setting up spark and variables...")
  spark = init_spark_session()

  csv_filepath= read_file_from_args()

  df = spark.read \
          .option("header", True) \
          .option("delimiter",",") \
      .csv("data/data_file_20210527182730.csv")

Setting up spark and variables...
Args:  ['-f', 'C:\\Users\\9837340\\AppData\\Roaming\\jupyter\\runtime\\kernel-22689fcf-2eec-46f5-8f4b-7345e1d81fd4.json']


In [80]:
df.toPandas()

Unnamed: 0,2url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,Yes,Yes,4.1/5,775,080 42297555,,,,,,,,,
1,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,"""[('Rated 4.0', 'RATED\n A beautiful place to...",('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...
2,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,Yes,No,4.1/5,787,080 41714161,Banashankari,Casual Dining,"Momos, Lunch Buffet, Chocolate Nirvana, Thai G...","Chinese, North Indian, Thai",800,"""[('Rated 4.0', 'RATED\n Had been here for di...",rice was well cooked and overall was great\n\n...,('Rated 5.0','RATED\n This place just cool ? with good am...
3,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,Yes,No,3.8/5,918,+91 966348799,Banashankari,"Cafe, Casual Dining","Churros, Cannelloni, Minestrone Soup, Hot Choc...","Cafe, Mexican, Italian",800,"""[('Rated 3.0', """"RATED\n Ambience is not tha...",('Rated 3.0',"""""RATED\n \nWent there for a quick bite with ...",pasta churros and lasagne.\n\nNachos were pat...
4,https://www.zomato/bangalore/addhuri-udupi-bho...,"1st Floor, Annakuteera, 3rd Stage, Banashankar...",Addhuri Udupi Bhojana,No,No,3.7/5,88,+91 9620009302,Banashankari,Quick Bites,Masala Dosa,"South Indian, North Indian",300,"""[('Rated 4.0', """"RATED\n Great food and prop...",('Rated 2.0','RATED\n Reached the place at 3pm on Saturda...,('Rated 4.0'
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10284,https://www.zomato.com/bangalore/oakleaf-white...,"62, Forum Neighbourhood Mall, Whitefield, Bang...",Oakleaf,Yes,No,3.9/5,40,080 66708888,,,,,,,,,
10285,"+91 725959971""",Whitefield,Casual Dining,Salmon,"Asian, Continental, North Indian, Italian",2000,"[('Rated 5.0', 'RATED\n Stayed at Oakwood Res...","['Cheese Stuffed Paratha', 'Crunchy Panko Coat...",Delivery,Brookefield,,,,,,,
10286,https://www.zomato.com/bangalore/kidilum-white...,"125/5, Behind ITPL, Pattandur Agrahara, Whitef...",Kidilum,Yes,No,3.8/5,207,+91 9845041750,,,,,,,,,
10287,"+91 890486998""",Whitefield,Quick Bites,"Beef Fry, Appam, Chicken Curry, Kerala Parotta...","Kerala, Chinese, North Indian",350,"[('Rated 4.0', 'RATED\n Ordered morning break...","['Egg Roast', 'Chicken Curry', 'Chicken Roast'...",Delivery,Brookefield,,,,,,,


### Dropping unused columns

In [81]:
df_drop_columns = df.drop('online_order', 'book_table', 'approx_cost(for two people)', 'menu_item', 'listed_in(type)', 'listed_in(city)')
df_drop_columns.printSchema()

root
 |-- 2url: string (nullable = true)
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rate: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- location: string (nullable = true)
 |-- rest_type: string (nullable = true)
 |-- dish_liked: string (nullable = true)
 |-- cuisines: string (nullable = true)
 |-- reviews_list: string (nullable = true)



In [82]:
df_column_castted = df_drop_columns.withColumn("dish_liked_array", split(col('dish_liked'), ',')) \
                        .withColumn("review_list_array", split(col('reviews_list'), ',')) \
                        .withColumn("cuisines_array", split(col("cuisines"), ',')) \
                        .drop("dish_liked", "reviews_list", "cuisines")
df_column_castted.printSchema()

root
 |-- 2url: string (nullable = true)
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rate: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- location: string (nullable = true)
 |-- rest_type: string (nullable = true)
 |-- dish_liked_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- review_list_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- cuisines_array: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [83]:
df_column_castted.toPandas()

Unnamed: 0,2url,address,name,rate,votes,phone,location,rest_type,dish_liked_array,review_list_array,cuisines_array
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,4.1/5,775,080 42297555,,,,,
1,"+91 9743772233""",Banashankari,Casual Dining,800,"""[('Rated 4.0', 'RATED\n A beautiful place to...",('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0',[ 'RATED\n Overdelighted by the service and f...,[ ('Rated 4.0'],[ ('Rated 4.0']
2,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,4.1/5,787,080 41714161,Banashankari,Casual Dining,"[Momos, Lunch Buffet, Chocolate Nirvana, Th...","[""[('Rated 4.0', 'RATED\n Had been here for ...","[Chinese, North Indian, Thai]"
3,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,3.8/5,918,+91 966348799,Banashankari,"Cafe, Casual Dining","[Churros, Cannelloni, Minestrone Soup, Hot ...","[""[('Rated 3.0', """"RATED\n Ambience is not t...","[Cafe, Mexican, Italian]"
4,https://www.zomato/bangalore/addhuri-udupi-bho...,"1st Floor, Annakuteera, 3rd Stage, Banashankar...",Addhuri Udupi Bhojana,3.7/5,88,+91 9620009302,Banashankari,Quick Bites,[Masala Dosa],"[""[('Rated 4.0', """"RATED\n Great food and pr...","[South Indian, North Indian]"
...,...,...,...,...,...,...,...,...,...,...,...
10284,https://www.zomato.com/bangalore/oakleaf-white...,"62, Forum Neighbourhood Mall, Whitefield, Bang...",Oakleaf,3.9/5,40,080 66708888,,,,,
10285,"+91 725959971""",Whitefield,Casual Dining,2000,"[('Rated 5.0', 'RATED\n Stayed at Oakwood Res...","['Cheese Stuffed Paratha', 'Crunchy Panko Coat...",Delivery,Brookefield,,,
10286,https://www.zomato.com/bangalore/kidilum-white...,"125/5, Behind ITPL, Pattandur Agrahara, Whitef...",Kidilum,3.8/5,207,+91 9845041750,,,,,
10287,"+91 890486998""",Whitefield,Quick Bites,350,"[('Rated 4.0', 'RATED\n Ordered morning break...","['Egg Roast', 'Chicken Curry', 'Chicken Roast'...",Delivery,Brookefield,,,


## Data Exploration

In [84]:
df.select('rest_type').distinct().show()

+--------------------+
|           rest_type|
+--------------------+
| I tried blue lim...|
| he said ' This i...|
| chicken ghee roa...|
| very reasonably ...|
|Beverage Shop, De...|
| thought of givin...|
| received re frie...|
|['New Chicken Mah...|
|               Dhaba|
| this is the plac...|
| place order agai...|
|              Bakery|
| ""RATED\n  Since...|
| Chicken tikka ma...|
| not bad at all. ...|
| as well as Iced ...|
|                 Bar|
| have tried their...|
|        'Raagi Roti'|
| but with excelle...|
+--------------------+
only showing top 20 rows



In [85]:
df_regex_url = df_column_castted.withColumn("valid_url", regexp_extract(col('2url'), r"[(http(s)?):\/\/(www\.)?a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*)", 0))
df_regex_columns = df_regex_url.withColumn("phone_valid", regexp_replace( regexp_extract(col('phone'), r"\d[\d -]{8,12}\d", 0), " ", ''))
df_regex_columns.select('2url', 'valid_url', 'phone', 'phone_valid').toPandas()

Unnamed: 0,2url,valid_url,phone,phone_valid
0,https://www.zomato.com/bangalore/jalsa-banasha...,https://www.zomato.com/bangalore/jalsa-banasha...,080 42297555,08042297555
1,"+91 9743772233""",,('Rated 4.0',
2,https://www.zomato.com/bangalore/spice-elephan...,https://www.zomato.com/bangalore/spice-elephan...,080 41714161,08041714161
3,https://www.zomato.com/SanchurroBangalore?cont...,https://www.zomato.com/SanchurroBangalore?cont...,+91 966348799,91966348799
4,https://www.zomato/bangalore/addhuri-udupi-bho...,https://www.zomato/bangalore/addhuri-udupi-bho...,+91 9620009302,919620009302
...,...,...,...,...
10284,https://www.zomato.com/bangalore/oakleaf-white...,https://www.zomato.com/bangalore/oakleaf-white...,080 66708888,08066708888
10285,"+91 725959971""",,"['Cheese Stuffed Paratha', 'Crunchy Panko Coat...",
10286,https://www.zomato.com/bangalore/kidilum-white...,https://www.zomato.com/bangalore/kidilum-white...,+91 9845041750,919845041750
10287,"+91 890486998""",,"['Egg Roast', 'Chicken Curry', 'Chicken Roast'...",


In [86]:
df_regex_columns_v2 = df_regex_columns.withColumn("valid_votes", regexp_extract(col('votes'), r"^[0-9]*$", 0))
df_regex_columns_v2.select('votes', 'valid_votes').toPandas()

Unnamed: 0,votes,valid_votes
0,775,775
1,"""[('Rated 4.0', 'RATED\n A beautiful place to...",
2,787,787
3,918,918
4,88,88
...,...,...
10284,40,40
10285,"[('Rated 5.0', 'RATED\n Stayed at Oakwood Res...",
10286,207,207
10287,"[('Rated 4.0', 'RATED\n Ordered morning break...",


In [87]:
df_replace_comma_address = df_regex_columns_v2.withColumn("address_cleaned", regexp_replace(regexp_extract(col('address'), r"^[#.0-9a-zA-Z\s,-]+$", 0), ',', "")) \
                                            .withColumn("rate_valid", regexp_extract(col('rate'), r"(\d+\.?\d+)\/(\d*\.?\d*)",0))
df_replace_comma_address.select('address', 'address_cleaned', 'rate', 'rate_valid').toPandas()

Unnamed: 0,address,address_cleaned,rate,rate_valid
0,"942, 21st Main Road, 2nd Stage, Banashankari, ...",942 21st Main Road 2nd Stage Banashankari Bang...,4.1/5,4.1/5
1,Banashankari,Banashankari,800,
2,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",2nd Floor 80 Feet Road Near Big Bazaar 6th Blo...,4.1/5,4.1/5
3,"1112, Next to KIMS Medical College, 17th Cross...",1112 Next to KIMS Medical College 17th Cross 2...,3.8/5,3.8/5
4,"1st Floor, Annakuteera, 3rd Stage, Banashankar...",1st Floor Annakuteera 3rd Stage Banashankari B...,3.7/5,3.7/5
...,...,...,...,...
10284,"62, Forum Neighbourhood Mall, Whitefield, Bang...",62 Forum Neighbourhood Mall Whitefield Bangalore,3.9/5,3.9/5
10285,Whitefield,Whitefield,2000,
10286,"125/5, Behind ITPL, Pattandur Agrahara, Whitef...",,3.8/5,3.8/5
10287,Whitefield,Whitefield,350,


#### Location dataset lookup and validation

In [88]:
df_location = spark.read \
          .option("inferSchema",True) \
          .option("header", True) \
          .option("delimiter",",") \
      .csv("data/Areas_in_blore.csv")

In [89]:
df_location.printSchema()

root
 |-- Area: string (nullable = true)
 |-- Taluk: string (nullable = true)
 |-- District: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Pincode: integer (nullable = true)



In [90]:
df_location.toPandas()

Unnamed: 0,Area,Taluk,District,State,Pincode
0,A F Station Yelahanka,Bangalore North,Bangalore,KARNATAKA,560063
1,Agram,Bangalore South,Bangalore,KARNATAKA,560007
2,Air Force Hospital,Bangalore North,Bangalore,KARNATAKA,560007
3,Amruthahalli,Bangalore North,Bangalore,KARNATAKA,560092
4,Anandnagar Bangalore,Bangalore North,Bangalore,KARNATAKA,560024
...,...,...,...,...,...
259,Tavarekere Bangalore,Bangaloresouth,Bangalore,KARNATAKA,562130
260,Thammanayakanahalli,Anekal,Bangalore,KARNATAKA,562106
261,Vanakanahalli,Anekal,Bangalore,KARNATAKA,562106
262,Vidyanagara,Bg North,Bangalore,KARNATAKA,562157


### LEFT JOIN df with df_location
#### To see which df row is valid for location

In [91]:
df_join_location = df_replace_comma_address.join(df_location, df_replace_comma_address['location'] == df_location['Area'], "left")

In [92]:
print(df_join_location.filter(col("Area").isNotNull()).count())

2246


In [93]:
df_join_location.filter(col("Area").isNotNull()).toPandas()

Unnamed: 0,2url,address,name,rate,votes,phone,location,rest_type,dish_liked_array,review_list_array,...,valid_url,phone_valid,valid_votes,address_cleaned,rate_valid,Area,Taluk,District,State,Pincode
0,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,4.1/5,787,080 41714161,Banashankari,Casual Dining,"[Momos, Lunch Buffet, Chocolate Nirvana, Th...","[""[('Rated 4.0', 'RATED\n Had been here for ...",...,https://www.zomato.com/bangalore/spice-elephan...,08041714161,787,2nd Floor 80 Feet Road Near Big Bazaar 6th Blo...,4.1/5,Banashankari,Bangalore South,Bangalore,KARNATAKA,560050
1,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,3.8/5,918,+91 966348799,Banashankari,"Cafe, Casual Dining","[Churros, Cannelloni, Minestrone Soup, Hot ...","[""[('Rated 3.0', """"RATED\n Ambience is not t...",...,https://www.zomato.com/SanchurroBangalore?cont...,91966348799,918,1112 Next to KIMS Medical College 17th Cross 2...,3.8/5,Banashankari,Bangalore South,Bangalore,KARNATAKA,560050
2,https://www.zomato/bangalore/addhuri-udupi-bho...,"1st Floor, Annakuteera, 3rd Stage, Banashankar...",Addhuri Udupi Bhojana,3.7/5,88,+91 9620009302,Banashankari,Quick Bites,[Masala Dosa],"[""[('Rated 4.0', """"RATED\n Great food and pr...",...,https://www.zomato/bangalore/addhuri-udupi-bho...,919620009302,88,1st Floor Annakuteera 3rd Stage Banashankari B...,3.7/5,Banashankari,Bangalore South,Bangalore,KARNATAKA,560050
3,https://www.zomato.com/bangalore/cafe-shuffle-...,"941, 3rd FLOOR, 21st Main, 22nd Cross, Banasha...",Cafe Shuffle,4.2/5,150,+91 974216677,Banashankari,Cafe,"[Mocktails, Peri Fries, Lasagne, Pizza, Ch...","[""[('Rated 1.0', """"RATED\n \n\nHorrible. Not ...",...,https://www.zomato.com/bangalore/cafe-shuffle-...,91974216677,150,941 3rd FLOOR 21st Main 22nd Cross Banashankar...,4.2/5,Banashankari,Bangalore South,Bangalore,KARNATAKA,560050
4,https://www.zomato.com/bangalore/the-coffee-sh...,"6th Block, 3rd Stage, Banashankari, Bangalore",The Coffee Shack,4.2/5,164,+91 973164421,Banashankari,Cafe,"[Coffee, Spaghetti, Pancakes, Nachos, Past...","[""[('Rated 4.0', """"RATED\n Food - 4/5\nAmbie...",...,https://www.zomato.com/bangalore/the-coffee-sh...,91973164421,164,6th Block 3rd Stage Banashankari Bangalore,4.2/5,Banashankari,Bangalore South,Bangalore,KARNATAKA,560050
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2241,https://www.zomato.com/bangalore/bangalore-aga...,"47, Bangalore Agarwal Bhavan, Pattandur Agraha...",Bangalore Agarwal Bhavan,3.3/5,6,+91 8310009071,Whitefield,Sweet Shop,,"[[('Rated 1.0', 'RATED\n Had ordered food on...",...,https://www.zomato.com/bangalore/bangalore-aga...,918310009071,6,47 Bangalore Agarwal Bhavan Pattandur Agrahara...,3.3/5,Whitefield,Bangalore South,Bangalore,KARNATAKA,560066
2242,https://www.zomato.com/bangalore/mangalore-lun...,"SJR Teck Park, Plot 13, 14, 15, EpIp Area, Whi...",Mangalore Lunch Home,3.4/5,8,+91 9448426577,Whitefield,Quick Bites,,"[""[('Rated 4.0', 'RATED\n I ordered neer dos...",...,https://www.zomato.com/bangalore/mangalore-lun...,919448426577,8,SJR Teck Park Plot 13 14 15 EpIp Area Whitefie...,3.4/5,Whitefield,Bangalore South,Bangalore,KARNATAKA,560066
2243,https://www.zomato.com/bangalore/desi-doze-whi...,"1, Nallurhalli Road, Opposite Brigade RV Cente...",Desi Doze,3.8/5,147,+91 9742425962,Whitefield,Quick Bites,"[Lassi, Paratha, Chaach, Chole Bhature, Th...","[""[('Rated 1.0', 'RATED\n Dal was completely...",...,https://www.zomato.com/bangalore/desi-doze-whi...,919742425962,147,1 Nallurhalli Road Opposite Brigade RV Center ...,3.8/5,Whitefield,Bangalore South,Bangalore,KARNATAKA,560066
2244,https://www.zomato.com/bangalore/hunger-stop-w...,"Opposite to Neeladari Apartment,Nallurahalli R...",Hunger Stop,3.7/5,100,+91 7829999672,Whitefield,Quick Bites,,"[""[('Rated 3.0', """"RATED\n Actual 3.5(since ...",...,https://www.zomato.com/bangalore/hunger-stop-w...,917829999672,100,Opposite to Neeladari ApartmentNallurahalli Ro...,3.7/5,Whitefield,Bangalore South,Bangalore,KARNATAKA,560066


In [94]:
df_valid_location = df_join_location.filter(col("Area").isNotNull())

In [95]:
df_nulls_filtered = df_valid_location.filter( (col('name').isNotNull()  
                                               & col('phone').isNotNull() 
                                               & col('location').isNotNull()) )
df_nulls_filtered.count()

2246

In [96]:
df_nulls_filtered.printSchema()

root
 |-- 2url: string (nullable = true)
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rate: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- location: string (nullable = true)
 |-- rest_type: string (nullable = true)
 |-- dish_liked_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- review_list_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- cuisines_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- valid_url: string (nullable = true)
 |-- phone_valid: string (nullable = true)
 |-- valid_votes: string (nullable = true)
 |-- address_cleaned: string (nullable = true)
 |-- rate_valid: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Taluk: string (nullable = true)
 |-- District: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Pincode: integer (nullable = true)


In [97]:
df_drop_unused_columns = df_nulls_filtered.drop('2url', 'address', 'phone', 'votes', 'rate', 'Area', 'Taluk', 'District', 'State', 'Pincode')
df_renamed_columns = df_drop_unused_columns.withColumnRenamed('valid_url','url') \
                                        .withColumnRenamed('address_cleaned', 'address') \
                                        .withColumnRenamed('phone_valid', 'phone') \
                                        .withColumnRenamed('valid_votes', 'votes') \
                                        .withColumnRenamed('rate_valid', 'rate')

df_renamed_columns.printSchema()

root
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- rest_type: string (nullable = true)
 |-- dish_liked_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- review_list_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- cuisines_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- url: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- address: string (nullable = true)
 |-- rate: string (nullable = true)



#### As CSV doesn't support array types we need to convert them to String type.
##### Here is a example of how I'm going to convert array type columns to string

In [98]:
df_renamed_columns.withColumn('dish_liked', concat_ws(' ', col('dish_liked_array'))).select('dish_liked').toPandas()

Unnamed: 0,dish_liked
0,Momos Lunch Buffet Chocolate Nirvana Thai G...
1,Churros Cannelloni Minestrone Soup Hot Choc...
2,Masala Dosa
3,Mocktails Peri Fries Lasagne Pizza Chicken...
4,Coffee Spaghetti Pancakes Nachos Pasta Sa...
...,...
2241,
2242,
2243,Lassi Paratha Chaach Chole Bhature Thali
2244,


In [99]:
df_arrays_to_string = df_renamed_columns.withColumn('dish_liked', concat_ws(' ', col('dish_liked_array'))) \
                                    .withColumn('review_list', concat_ws(' ', col('review_list_array'))) \
                                    .withColumn('cuisines', concat_ws(' ', col('cuisines_array'))) \
                                .drop('dish_liked_array', 'review_list_array', 'cuisines_array')

In [100]:
df_arrays_to_string.select('dish_liked', 'review_list', 'cuisines').toPandas()

Unnamed: 0,dish_liked,review_list,cuisines
0,Momos Lunch Buffet Chocolate Nirvana Thai G...,"""[('Rated 4.0' 'RATED\n Had been here for di...",Chinese North Indian Thai
1,Churros Cannelloni Minestrone Soup Hot Choc...,"""[('Rated 3.0' """"RATED\n Ambience is not tha...",Cafe Mexican Italian
2,Masala Dosa,"""[('Rated 4.0' """"RATED\n Great food and prop...",South Indian North Indian
3,Mocktails Peri Fries Lasagne Pizza Chicken...,"""[('Rated 1.0' """"RATED\n \n\nHorrible. Not ev...",Cafe Italian Continental
4,Coffee Spaghetti Pancakes Nachos Pasta Sa...,"""[('Rated 4.0' """"RATED\n Food - 4/5\nAmbienc...",Cafe Chinese Continental Italian
...,...,...,...
2241,,[('Rated 1.0' 'RATED\n Had ordered food onli...,Mithai Street Food
2242,,"""[('Rated 4.0' 'RATED\n I ordered neer dosa ...",North Indian South Indian Mangalorean
2243,Lassi Paratha Chaach Chole Bhature Thali,"""[('Rated 1.0' 'RATED\n Dal was completely b...",North Indian Fast Food
2244,,"""[('Rated 3.0' """"RATED\n Actual 3.5(since we...",North Indian Chinese Fast Food


#### Cleaning review_list from unwanted special chars/ junk chars

In [101]:
df_special_char_cleaned = df_arrays_to_string.withColumn('review_list', regexp_replace(col('review_list'), "[^a-zA-Z0-9]", " ")) \
                                            .withColumn("phone", df_arrays_to_string.phone.cast('integer')) \
                                            .withColumn("votes", df_arrays_to_string.votes.cast('integer'))

#### Sorting column as the output expected

In [102]:
df_columns_sorted = df_special_char_cleaned.select(col('url'),
                                             col('address'),
                                             col('name'),
                                             col('rate'),
                                             col('votes'),
                                             col('phone'),
                                             col('location'),
                                             col('rest_type'),
                                             col('dish_liked'),
                                             col('cuisines'),
                                             col('review_list'))
df_columns_sorted.printSchema()

root
 |-- url: string (nullable = true)
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rate: string (nullable = true)
 |-- votes: integer (nullable = true)
 |-- phone: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- rest_type: string (nullable = true)
 |-- dish_liked: string (nullable = false)
 |-- cuisines: string (nullable = false)
 |-- review_list: string (nullable = false)



### Writting Valid output data (mostly)

In [103]:
df_columns_sorted.toPandas().to_csv("./output/record_out.csv", index=False)

#### As we can see, joining the main data with area data is going to tell us which register is going to be valid for us (at least to be valid for this test). So as we got valid data only with the join of this two dataframes, now those how doesn't have a valid location are going to be grouped as _**bad data**_

#### For bad data or data that doesn't meet the requirements are going to be writed as **_'record_bad.csv'_**

#### As we can see there is a lot of error in this new dataset.

In [64]:
df_join_location.filter(col('Area').isNull()).toPandas()

Unnamed: 0,2url,address,name,rate,votes,phone,location,rest_type,dish_liked_array,review_list_array,...,valid_url,phone_valid,valid_votes,address_cleaned,rate_valid,Area,Taluk,District,State,Pincode
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,4.1/5,775,080 42297555,,,,,...,https://www.zomato.com/bangalore/jalsa-banasha...,08042297555,775,942 21st Main Road 2nd Stage Banashankari Bang...,4.1/5,,,,,
1,"+91 9743772233""",Banashankari,Casual Dining,800,"""[('Rated 4.0', 'RATED\n A beautiful place to...",('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0',[ 'RATED\n Overdelighted by the service and f...,[ ('Rated 4.0'],...,,,,Banashankari,,,,,,
2,https://www.zomato.com/bangalore/grand-village...,"10, 3rd Floor, Lakshmi Associates, Gandhi Baza...",Grand Village,3.8/5,166,+91 8026612447,,,,,...,https://www.zomato.com/bangalore/grand-village...,918026612447,166,10 3rd Floor Lakshmi Associates Gandhi Bazaar ...,3.8/5,,,,,
3,"+91 990121000""",Basavanagudi,Casual Dining,600,"[('Rated 4.0', 'RATED\n Very good restaurant ...",[],Buffet,Banashankari,,,...,,,,Basavanagudi,,,,,,
4,https://www.zomato/bangalore/timepass-dinner-b...,"37, 5-1, 4th Floor, Bosco Court, Gandhi Bazaar...",Timepass Dinner,3.8/5,286,+91 9980040002,,,,,...,https://www.zomato/bangalore/timepass-dinner-b...,919980040002,286,37 5-1 4th Floor Bosco Court Gandhi Bazaar Mai...,3.8/5,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8038,https://www.zomato.com/bangalore/south-house-b...,"579, Ground Floor, C Block AECS Layout, Kundal...",South House,3.8/5,97,+91 960627999,Brookefield,Casual Dining,"[Egg Biryani, Roti, Mutton Biryani, Chicken...","[""[('Rated 3.0', 'RATED\n Ordered Egg Biryan...",...,https://www.zomato.com/bangalore/south-house-b...,91960627999,97,579 Ground Floor C Block AECS Layout Kundalaha...,3.8/5,,,,,
8039,https://www.zomato.com/bangalore/oakleaf-white...,"62, Forum Neighbourhood Mall, Whitefield, Bang...",Oakleaf,3.9/5,40,080 66708888,,,,,...,https://www.zomato.com/bangalore/oakleaf-white...,08066708888,40,62 Forum Neighbourhood Mall Whitefield Bangalore,3.9/5,,,,,
8040,"+91 725959971""",Whitefield,Casual Dining,2000,"[('Rated 5.0', 'RATED\n Stayed at Oakwood Res...","['Cheese Stuffed Paratha', 'Crunchy Panko Coat...",Delivery,Brookefield,,,...,,,,Whitefield,,,,,,
8041,https://www.zomato.com/bangalore/kidilum-white...,"125/5, Behind ITPL, Pattandur Agrahara, Whitef...",Kidilum,3.8/5,207,+91 9845041750,,,,,...,https://www.zomato.com/bangalore/kidilum-white...,919845041750,207,,3.8/5,,,,,


In [65]:
df_bad_data = df_join_location.filter(col('Area').isNull()) \
                              .withColumn('dish_liked', concat_ws(' ', col('dish_liked_array'))) \
                              .withColumn('review_list', concat_ws(' ', col('review_list_array'))) \
                              .withColumn('cuisines', concat_ws(' ', col('cuisines_array'))) \
                            .drop('2url', 'address', 'phone', 'votes', 'rate', 'Area', 'Taluk', 'District', 'State', 'Pincode','dish_liked_array', 'review_list_array', 'cuisines_array') \
                            .withColumnRenamed('valid_url','url') \
                            .withColumnRenamed('address_cleaned', 'address') \
                            .withColumnRenamed('phone_valid', 'phone') \
                            .withColumnRenamed('valid_votes', 'votes') \
                            .withColumnRenamed('rate_valid', 'rate') \
                        .select(col('url'),
                                col('address'),
                                col('name'),
                                col('rate'),
                                col('votes'),
                                col('phone'),
                                col('location'),
                                col('rest_type'),
                                col('dish_liked'),
                                col('cuisines'),
                                col('review_list'))


In [66]:
df_bad_data.printSchema()

root
 |-- url: string (nullable = true)
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rate: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- location: string (nullable = true)
 |-- rest_type: string (nullable = true)
 |-- dish_liked: string (nullable = false)
 |-- cuisines: string (nullable = false)
 |-- review_list: string (nullable = false)



### Writing bad data as CSV file

In [67]:
df_bad_data.toPandas().to_csv("./output/record_bad.csv", index=False)

### Data Validation

##### * Votes column:

In [107]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult_votes_column = VerificationSuite(spark) \
    .onData(df_columns_sorted) \
    .addCheck(
        #Votes
        check.isNonNegative("votes") \
        .hasDataType("votes", ConstrainableDataTypes.Numeric) \
        .isComplete("votes")
        .hasMin("votes", lambda x: x >= 0)
        #Phone
        .hasDataType("phone", ConstrainableDataTypes.Numeric) \
        .isComplete("phone") \
        .isNonNegative("phone") \
        #Location
        .hasDataType("location", ConstrainableDataTypes.String) \
        .isComplete("location") \
        #Name
        .hasDataType("name", ConstrainableDataTypes.String) \
        .isComplete("name")
    
        ) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult_votes_column)
checkResult_df.show(truncate=True)

+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
+------------+-----------+------------+--------------------+-----------------+--------------------+



##### As above shows us, we can use Pydeequ framework to analyze and check data validations on Spark