In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, current_date, expr, regexp_extract, round, to_date

spark = (SparkSession
    .builder
    .appName('BigDataProject')
    .getOrCreate()
)

csvFilePath = './Businesses_Registered_with_EBR_Parish_20240221.csv'
dfBusiness = spark.read.csv(csvFilePath, header=True, inferSchema=True)

csvBusinessLocationPath = './Street_Address_20240227.csv'
dfBusinessAddress = spark.read.csv(csvBusinessLocationPath, header=True, inferSchema=True)

24/02/29 19:32:55 WARN Utils: Your hostname, Prashants-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.93 instead (on interface en0)
24/02/29 19:32:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/29 19:32:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/29 19:32:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/02/29 19:32:57 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

In [42]:
# Filter out rows that are not in Baton Rouge, LA and does not have a business_name & naics_group is not defined
filteredDfBusiness = dfBusiness.filter(
    (col("PHYSICAL ADDRESS - CITY") == "BATON ROUGE") &
    (col("PHYSICAL ADDRESS - STATE") == "LA") &
    (col('LEGAL NAME') != '') &
    (col('NAICS GROUP') != '')
)

joined_df = filteredDfBusiness.join(
        dfBusinessAddress,
        filteredDfBusiness["ACCOUNT NAME"] == dfBusinessAddress["BUSINESS NAME"],
        "inner"
    ).select(
        col("LEGAL NAME").alias("BUSINESS NAME"),
        col("BUSINESS OPEN DATE"),
        col("BUSINESS STATUS"),
        col("BUSINESS CLOSE DATE"),
        filteredDfBusiness["NAICS CODE"],
        filteredDfBusiness["NAICS CATEGORY"],
        filteredDfBusiness["NAICS GROUP"],
        col("FULL STREET NAME"),
        col("PHYSICAL ADDRESS - CITY").alias("CITY"),
        col("PHYSICAL ADDRESS - STATE").alias("STATE"),
        col("GEOMETRY"),
    )

# regular expression pattern to extract latitude and longitude
pattern = r'POINT \(([^ ]+) ([^ ]+)\)'

joined_df = joined_df.withColumn('LATITUDE', regexp_extract(col('GEOMETRY'), pattern, 1)) \
                        .withColumn('LONGITUDE', regexp_extract(col('GEOMETRY'), pattern, 2))
joined_df = joined_df.drop('GEOMETRY')

# Converting string columns to date columns
# joined_df.show(2, truncate=False)
# date_format = "MM/dd/yyyy"
# joined_df = joined_df \
#     .withColumn("BUSINESS OPEN DATE", to_date(joined_df["BUSINESS OPEN DATE"], date_format)) \
#     .withColumn("BUSINESS CLOSE DATE", to_date(joined_df["BUSINESS CLOSE DATE"], date_format))

# joined_df.printSchema()
# joined_df.show(2, truncate=False)

+----------------------------+------------------+---------------+-------------------+----------+--------------------------------------------------+-----------------+----------------+-----------+-----+----------------+---------------+
|BUSINESS NAME               |BUSINESS OPEN DATE|BUSINESS STATUS|BUSINESS CLOSE DATE|NAICS CODE|NAICS CATEGORY                                    |NAICS GROUP      |FULL STREET NAME|CITY       |STATE|LATITUDE        |LONGITUDE      |
+----------------------------+------------------+---------------+-------------------+----------+--------------------------------------------------+-----------------+----------------+-----------+-----+----------------+---------------+
|HARDWARE STUDIO LLC         |12/01/2020        |O              |NULL               |444000    |Building Material & Garden Equip & Supplies Dealer|RETAIL TRADE (G) |PERKINS RD E    |BATON ROUGE|LA   |-91.022990315425|30.34764735661 |
|CONNIE'S CAKE COLLECTION LLC|04/17/2023        |O              

In [30]:
# Group by NAICS CATEGORY, PHYSICAL ADDRESS - CITY, and PHYSICAL ADDRESS - STATE and count occurrences
# distribution = dfNew.groupBy("NAICS CATEGORY", "PHYSICAL ADDRESS - CITY", "PHYSICAL ADDRESS - STATE").count()
# distributionByState = joined_df.groupBy(
#     "FULL STREET NAME",
#     "NAICS CATEGORY",
# ).agg(count("*").alias("COUNT"))

distributionByState = joined_df.groupBy(
    "FULL STREET NAME",
    "NAICS CATEGORY",
).agg(
    count("*").alias("TOTAL COUNT"),
    expr("sum(case when `BUSINESS STATUS` = 'O' then 1 else 0 end)").alias("ACTIVE COUNT"),
    expr("sum(case when `BUSINESS STATUS` = 'C' then 1 else 0 end)").alias("CLOSED COUNT")
)

print(joined_df.show(5))


+--------------------+------------------+---------------+-------------------+----------+--------------------+--------------------+----------------+-----------+-----+--------------------+--------+---------+
|       BUSINESS NAME|BUSINESS OPEN DATE|BUSINESS STATUS|BUSINESS CLOSE DATE|NAICS CODE|      NAICS CATEGORY|         NAICS GROUP|FULL STREET NAME|       CITY|STATE|            GEOMETRY|LATITUDE|LONGITUDE|
+--------------------+------------------+---------------+-------------------+----------+--------------------+--------------------+----------------+-----------+-----+--------------------+--------+---------+
| HARDWARE STUDIO LLC|              NULL|              O|               NULL|    444000|Building Material...|    RETAIL TRADE (G)|    PERKINS RD E|BATON ROUGE|   LA|POINT (-91.022990...|  -91.02|    30.35|
|CONNIE'S CAKE COL...|              NULL|              O|               NULL|    311000|  Food Manufacturing|   MANUFACTURING (D)|      BRITISH LN|BATON ROUGE|   LA|POINT (-91.

In [6]:
economy = joined_df.withColumn("BUSINESS OPEN DATE", to_date(joined_df["BUSINESS OPEN DATE"], "MM/dd/yyyy"))
economy = economy.withColumn("BUSINESS CLOSE DATE", to_date(economy["BUSINESS CLOSE DATE"], "MM/dd/yyyy"))

# Group by location and calculate the number of business openings and closures
location_trends = economy.groupBy("FULL STREET NAME").agg(
    count("BUSINESS OPEN DATE").alias("OPENINGS"),
    count("BUSINESS CLOSE DATE").alias("CLOSURES")
)

In [43]:
from cassandra.cluster import Cluster
# from cassandra.auth import PlainTextAuthProvider
# import pandas as pd

# Cassandra connection settings
contact_points = ['127.0.0.1']  # Cassandra contact points
keyspace = 'bigdata_keyspace'    # Keyspace name
# username = 'your_username'       # Cassandra username (if authentication is enabled)
# password = 'your_password'       # Cassandra password (if authentication is enabled)

# Connect to Cassandra cluster
# auth_provider = PlainTextAuthProvider(username=username, password=password) if username and password else None
cluster = Cluster(contact_points=contact_points)
session = cluster.connect()

# Use the keyspace
session.set_keyspace(keyspace)

In [44]:

# Prepare insert query
# table_name = f'{keyspace}.business_ebr'
# cassandra_column_names = [col.lower().replace(' ', '_') for col in joined_df.columns]
# column_names = ', '.join(cassandra_column_names)
# value_placeholders = ', '.join(['%s' for _ in joined_df.columns])
# insert_query = f"INSERT INTO {table_name} ({column_names}) VALUES ({value_placeholders})"

# for row in joined_df.collect():
#     # Execute the INSERT statement
#     session.execute(insert_query, tuple(row))


# Prepare insert query
table_name = f'{keyspace}.businesses'
cassandra_column_names = [col.lower().replace(' ', '_') for col in joined_df.columns]
column_names = ', '.join(cassandra_column_names)
value_placeholders = ', '.join(['%s' for _ in joined_df.columns])

insert_query = f"INSERT INTO {table_name} ({column_names}) VALUES ({value_placeholders})"

# Deleting previous records
session.execute(
    f"TRUNCATE {table_name}"
)

for row in joined_df.collect():
    session.execute(insert_query, tuple(row))

# Close the session and cluster connection
# session.shutdown()
# cluster.shutdown()


                                                                                

In [9]:
joined_df.count()

17407

In [11]:
contact_points = ['127.0.0.1']
keyspace = 'bigdata_keyspace'
table_name = f'{keyspace}.businesses'

# Connecting Cassandra DB
cluster = Cluster(contact_points=contact_points)
session = cluster.connect()
session.set_keyspace(keyspace)

# Execute a SELECT query to fetch all records from the table
select_query = f"SELECT * FROM {table_name};"
rows = session.execute(select_query)

# Convert the result into a list of tuples
data = [(row.business_close_date, row.business_name, row.business_open_date, row.business_status, 
         row.city, row.full_street_name, row.geometry, row.naics_category, row.naics_code, row.naics_group, row.state) 
        for row in rows]

# Create a DataFrame using PySpark
df = spark.createDataFrame(data, 
                           ["business_close_date", "business_name", "business_open_date", "business_status", 
                            "city", "full_street_name", "geometry", "naics_category", "naics_code", "naics_group", "state"])


In [12]:
df.printSchema()


root
 |-- business_close_date: string (nullable = true)
 |-- business_name: string (nullable = true)
 |-- business_open_date: string (nullable = true)
 |-- business_status: string (nullable = true)
 |-- city: string (nullable = true)
 |-- full_street_name: string (nullable = true)
 |-- geometry: string (nullable = true)
 |-- naics_category: string (nullable = true)
 |-- naics_code: long (nullable = true)
 |-- naics_group: string (nullable = true)
 |-- state: string (nullable = true)



In [16]:
df.schema.simpleString()

'struct<business_close_date:string,business_name:string,business_open_date:string,business_status:string,city:string,full_street_name:string,geometry:string,naics_category:string,naics_code:bigint,naics_group:string,state:string>'