## Import Credentials

In [0]:
%run "./Config"

In [0]:
# Mount the Container for importing and exporting the data
storageAccount = "gen10datafund2205"
storageContainer = "group1-marketing-analytics"
mount_point = "/mnt/group1/datain"
    
configs2 = {"fs.azure.account.auth.type": "OAuth",
   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
   "fs.azure.account.oauth2.client.id": clientid,
   "fs.azure.account.oauth2.client.secret": clientSecret,
   "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
   "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try:
    dbutils.fs.unmount(mount_point)
except:
    pass


dbutils.fs.mount(
source = f"abfss://{storageContainer}@{storageAccount}.dfs.core.windows.net/", 
mount_point = mount_point, 
extra_configs = configs2)

In [0]:
# Import relevant packages
from pyspark.sql.functions import col,count,when,regexp_replace, monotonically_increasing_id

In [0]:
# Read the Business json file
df = spark.read.format('json').load(mount_point+'/yelp_academic_dataset_business.json')
# Filter out Null rows and irrelevant columns
count_df = df.groupBy('state').count()
count_df = count_df.filter(col('count')>10)
state = count_df['state']
df = df.filter(col('state').isin(state))\
    .filter(col('state') != 'AB')\
    .filter(col('is_open') == 1)\
    .filter(~col('categories').isNull())\
    .filter(~col('attributes').isNull())\
    .filter("categories LIKE '%Restaurants%' OR categories Like '%Food%'")\
    .drop('hours','is_open','categories','address','city','state','latitude','longitude')\
    .withColumnRenamed('business_id','BusinessID')\
    .withColumnRenamed('stars','Stars')\
    .withColumnRenamed('review_count','BusinessReviewCount')\
    .withColumnRenamed('name','Name')\
    .withColumnRenamed('postal_code','ZipCode')

In [0]:
# Filter Business table by Zip Codes in zipCodeTable
zipCodeTable = spark.read.format('json').load(mount_point+'/ZipCodeTable_cleaned.json')
zipCodeTable = zipCodeTable.select('ZipCode')
df = df.join(zipCodeTable,on='ZipCode')

In [0]:
# List all fields in attributes column
attributes = list(df.select('attributes').toPandas()['attributes'])
attributes = list(attributes[0].keys())
# Remove BusinessParking and Ambience from attributes
attributes.remove('BusinessParking')
attributes.remove('Ambience')

In [0]:
# Create a new DataFrame from Business that only contains BusinessID and Attributes
df2 = df.select(['BusinessID','attributes'])
df = df.drop('attributes')
for i in attributes:
    df2 = df2.withColumn(i,col(f'attributes.{i}'))
df2 = df2.drop('attributes')

In [0]:
# Replace ‘None’ in attributes to None (Null values)
# Replace ‘True’ and ‘False’ into ‘1’ and ‘0’ respectively
for i in df2.columns:
    df2 = df2.withColumn(i,when(df2[i] == 'None',None).otherwise(df2[i]))
    df2 = df2.withColumn(i,when(df2[i] == 'True', 1).otherwise(df2[i]))
    df2 = df2.withColumn(i,when(df2[i] == 'False', 0).otherwise(df2[i]))

In [0]:
# Resolve u-string issues in attribute values
aList = ['NoiseLevel','RestaurantsAttire','Alcohol','WiFi']
for i in aList:
    df2 = df2.withColumn(i,when(df2[i].startswith('u'),regexp_replace(df2[i],"u'",'')).otherwise(df2[i]))
    df2 = df2.withColumn(i,when(df2[i].startswith("'"),regexp_replace(df2[i],"'",'')).otherwise(df2[i]))
    df2 = df2.withColumn(i,when(df2[i].endswith("'"),regexp_replace(df2[i],"'",'')).otherwise(df2[i]))

In [0]:
# Drop attributes that contain more than 50% Null values
null_counts = df2.select([count(when(col(c).isNull(), c)).alias(c) for c in df2.columns]).collect()[0].asDict()
thresh = df2.count()/2
to_drop = [k for k, v in null_counts.items() if v > thresh]
df2 = df2.drop(*to_drop)

In [0]:
# Create Attributes DataFrame
attributesLeft = list(set(attributes)-set(to_drop))
attributeID = []
attributeDict = {}
for i in range(len(attributesLeft)):
    attributeID.append(i+1)
    attributeDict[attributesLeft[i]] = i+1
attributeSchema = ['AttributesID','AttributesName']
attributesTable = spark.createDataFrame(zip(attributeID,attributesLeft),attributeSchema)

In [0]:
# Filter unique attribute values
attributesValue = set()
for i in attributesLeft:
    uniAttributes = df2.select(i).distinct()
    for j in range(uniAttributes.count()):
        attributesValue.add(uniAttributes.collect()[j][i])
attributesValue = list(attributesValue)
attributesValue.remove(None)
attributesValue.sort()

In [0]:
# Create AttributesValue DataFrame
attributeValueID = []
attributeValueDict = {}
for i in range(len(attributesValue)):
    attributeValueID.append(i+1)
    attributeValueDict[attributesValue[i]] = i+1
attributeValueSchema = ['AttributesValuesID','ValueName']
attributeValuesTable = spark.createDataFrame(zip(attributeValueID,attributesValue),attributeValueSchema)

In [0]:
# Create BusinessAttributes DataFrame
businessAttributesSchema = ['BusinessID','AttributesID','AttributesValuesID']
Rows = []
for i in df2.collect():
    bID = i['BusinessID']
    for j in range(1,len(df2.columns)):
        if i[j] != None:
            aID = attributeDict[df2.columns[j]]
            aVID = attributeValueDict[i[j]]
            Rows.append((bID,aID,aVID))
businessAttributes = spark.createDataFrame(Rows,businessAttributesSchema)
businessAttributes = businessAttributes.withColumn('AttributesID',businessAttributes['AttributesID'].cast('int'))\
                                        .withColumn('AttributesValuesID',businessAttributes['AttributesValuesID'].cast('int'))

In [0]:
# Export the business table to the container for other datasets to use
df.coalesce(1).write.mode('overwrite').csv(mount_point+'/business')

In [0]:
# Write to the Business table in SQL Database from Corresponding DataFrame
try:
    df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
        .mode("append") \
        .option("dbtable", 'dbo.Business') \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()
except:
    print('An Error Occured.')

In [0]:
# Write to the Attributes Table in SQL Database from Corresponding DataFrame
try:
    attributesTable.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
        .mode("append") \
        .option("dbtable", 'dbo.Attributes') \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()
except:
    print('An Error Occured.')

In [0]:
# Write to the AttributesValues Table in SQL Database from Corresponding DataFrame
try:
    attributeValuesTable.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
        .mode("append") \
        .option("dbtable", 'dbo.AttributesValues') \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()
except:
    print('An Error Occured.')

In [0]:
# Write to the BusinessAttributes Table in SQL Database from Corresponding DataFrame
try:
    businessAttributes.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
        .mode("append") \
        .option("dbtable", 'dbo.BusinessAttributes') \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()
except:
    print('An Error Occured.')