In [6]:
%pip install pandas sqlalchemy
%pip install azure-storage-blob

import pandas as pd
import zipfile
import os
import shutil
import io
import requests
import json
import time
import glob
from io import StringIO
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from sqlalchemy import create_engine
from geopy.geocoders import Nominatim
from tqdm import tqdm

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.1.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.1.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [7]:
# URL of the zip file
zip_url = "https://www.fns.usda.gov/sites/default/files/resource-files/historical-snap-retailer-locator-data-2023.12.31.zip"

# Download the zip file
response = requests.get(zip_url, timeout=30)

# Read the zip file contents
dfs = []  # List to store DataFrames from individual CSV files

with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
    # Iterate through each file in the zip archive
    for filename in zf.namelist():
        if filename.lower().endswith('.csv'):
            # Read the CSV file into a Pandas DataFrame
            with zf.open(filename) as f:
                df = pd.read_csv(f)
                dfs.append(df)

# Combine all DataFrames into a single DataFrame
raw_df = pd.concat(dfs, ignore_index=True)

# raw_df.to_csv('snap_retailer_data.csv', index=False, mode='w')
# raw_df = pd.read_csv('snap_retailer_data.csv')

In [8]:
# Custom retry logic with exponential backoff
def custom_retry(func):
    max_attempts = 2
    wait_min = 4
    wait_max = 10

    def wrapper(*args, **kwargs):
        attempts = 0
        while attempts < max_attempts:
            try:
                return func(*args, **kwargs)
            except Exception as e:
                print(f"Error: {e}. Retrying...")
                time.sleep(wait_min + (wait_max - wait_min) * attempts / max_attempts)
                attempts += 1
        print(f"Maximum retry attempts ({max_attempts}) reached. Returning None.")
        return None, None

    return wrapper

# Function to get latitude and longitude from address with custom retry logic
@custom_retry
def get_lat_long(address):
    geolocator = Nominatim(user_agent="my_geocoder")
    location = geolocator.geocode(address, timeout=10)  # Increase timeout to 10 seconds
    if location:
        return location.latitude, location.longitude
    else:
        return None, None

# Function to update latitude and longitude if either is 0
def update_lat_long(row):
    if row['Latitude'] == 0 or row['Longitude'] == 0:
        # Construct the address from address components
        address = (
            f"{row['Street Number']} {row['Street Name']}, "
            f"{row['City']}, {row['State']} {row['Zip Code']}"
        )
        try:
            # Get new latitude and longitude with custom retry logic
            new_lat, new_long = get_lat_long(address)
            if new_lat is not None and new_long is not None:
                # Update the latitude and longitude values
                row['Latitude'] = new_lat
                row['Longitude'] = new_long
        except Exception as e:
            print(f"Error geocoding address: {address} - {e}")
    return row

# Prompt user for confirmation
proceed = input("Do you want to update latitude and longitude where necessary? (yes/no): ").lower()

if proceed == "yes":
    # Apply the update_lat_long function to the DataFrame with tqdm progress bar
    tqdm.pandas()
    raw_df = raw_df.progress_apply(update_lat_long, axis=1)

    print("Latitude and Longitude values updated where necessary.")
else:
    print("No updates performed.")

No updates performed.


In [9]:
# Remove rows with invalid coordinates (latitude or longitude is zero)
raw_df = raw_df[(raw_df['Latitude'] != 0) & (raw_df['Longitude'] != 0)]

# Strip leading and trailing whitespace from specified string columns
columns_to_strip = ['Store Name', 'Store Type', 'Street Number', 'Street Name', 
                    'Additional Address', 'City', 'State', 'Zip4', 'County']

raw_df[columns_to_strip] = raw_df[columns_to_strip].astype(str)
raw_df[columns_to_strip] = raw_df[columns_to_strip].apply(lambda x: x.str.strip())

# Clean 'Street Number' by removing non-numeric characters and convert to numeric
raw_df['Street Number'] = raw_df['Street Number'].astype(str).str.replace(r'\D+', '', regex=True) 
raw_df['Street Number'] = pd.to_numeric(raw_df['Street Number'], errors='coerce')

# Drop rows with NaN values in 'Street Number'
raw_df = raw_df.dropna(subset=['Street Number'], axis=0)

# Drop rows with missing essential address components
required_columns = ['Street Number', 'Street Name', 'City', 'State', 'Zip Code']
raw_df = raw_df.dropna(subset=required_columns, how="any")

# Convert date columns to datetime format
raw_df["Authorization Date"] = pd.to_datetime(raw_df["Authorization Date"], errors="coerce")
raw_df["End Date"] = pd.to_datetime(raw_df["End Date"], errors="coerce")

# Extract year from 'Authorization Date' and create 'Authorization Year' column
raw_df["Authorization Year"] = raw_df["Authorization Date"].dt.year

# Select final columns of interest for analysis or export
columns_of_interest = [
    'Record ID', 'Store Name', 'Store Type', 'Street Number', 'Street Name',
    'Additional Address', 'City', 'State', 'Zip Code', 'Zip4', 'County',
    'Latitude', 'Longitude', 'Authorization Year', 'Authorization Date', 'End Date']

raw_df = raw_df[columns_of_interest]

CSV To Azure Container

In [10]:
# Specify the path to your JSON configuration file
config_file_path = 'config.json'

with open(config_file_path, 'r') as config_file:
    config = json.load(config_file)

# Azure connection to container
CONNECTION_STRING_AZURE_STORAGE = config["azure_connection_string"]
CONTAINER_AZURE = config["container_name"]

# Save DataFrame to CSV file and upload to Azure Blob Storage
# Convert DataFrame to CSV data (string)
csv_data = raw_df.to_csv(index=False)  

blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING_AZURE_STORAGE)
container_client = blob_service_client.get_container_client(CONTAINER_AZURE)

blob_name = 'snap_retailer_data.csv'

# Upload CSV data directly to Azure Blob Storage
upload_azure = blob_service_client.get_blob_client(container=CONTAINER_AZURE, blob=blob_name)
upload_azure.upload_blob(csv_data, overwrite=True) 

# List all blobs in the specified container
blob_list = container_client.list_blobs()
for blob in blob_list:
    print(blob.name)
    blob_client = container_client.get_blob_client(blob=blob.name)
    blob_data = blob_client.download_blob()
    blob_content = blob_data.readall().decode('utf-8')
    df = pd.read_csv(StringIO(blob_content))
    
df.info()

snap_retailer_data.csv


  df = pd.read_csv(StringIO(blob_content))


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 656429 entries, 0 to 656428
Data columns (total 16 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   Record ID           656429 non-null  int64  
 1   Store Name          656429 non-null  object 
 2   Store Type          656429 non-null  object 
 3   Street Number       656429 non-null  float64
 4   Street Name         656407 non-null  object 
 5   Additional Address  38288 non-null   object 
 6   City                656429 non-null  object 
 7   State               656429 non-null  object 
 8   Zip Code            656429 non-null  int64  
 9   Zip4                585633 non-null  object 
 10  County              656426 non-null  object 
 11  Latitude            656429 non-null  float64
 12  Longitude           656429 non-null  float64
 13  Authorization Year  656429 non-null  int64  
 14  Authorization Date  656429 non-null  object 
 15  End Date            405164 non-nul

Rename Columns

In [11]:
sql_df = raw_df.copy()

# Define the mapping of old column names to new column names
column_mapping = {
    'Record ID': 'record_id',
    'Store Name': 'store_name',
    'Store Type': 'store_type',
    'Street Number': 'street_number',
    'Street Name': 'street_name',
    'Additional Address': 'additional_address',
    'City': 'city',
    'State': 'state',
    'Zip Code': 'zip_code',
    'Zip4': 'zip4',
    'County': 'county',
    'Latitude': 'latitude',
    'Longitude': 'longitude',
    'Authorization Year': 'authorization_year',
    'Authorization Date': 'authorization_date',
    'End Date' : 'end_date'
}

# Rename columns using the mapping
sql_df.rename(columns=column_mapping, inplace=True)
print(sql_df.info())

<class 'pandas.core.frame.DataFrame'>
Index: 656429 entries, 0 to 996558
Data columns (total 16 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   record_id           656429 non-null  int64         
 1   store_name          656429 non-null  object        
 2   store_type          656429 non-null  object        
 3   street_number       656429 non-null  float64       
 4   street_name         656429 non-null  object        
 5   additional_address  656429 non-null  object        
 6   city                656429 non-null  object        
 7   state               656429 non-null  object        
 8   zip_code            656429 non-null  int64         
 9   zip4                656429 non-null  object        
 10  county              656429 non-null  object        
 11  latitude            656429 non-null  float64       
 12  longitude           656429 non-null  float64       
 13  authorization_year  656429 non-nul

dim_storetype


In [12]:
sql_df['store_type'] = sql_df['store_type'].replace("Farmers' Market", "Farmers Market")

storetype_id_mapping = {
    'Convenience Store': 'CS',
    'Combination Grocery/Other': 'CO',
    'Large Grocery Store': 'LG',
    'Medium Grocery Store': 'MG',
    'Small Grocery Store': 'SG',
    'Supermarket': 'SM',
    'Super Store': 'SS',
    'Meat/Poultry Specialty': 'ME',
    'Delivery Route': 'DR',
    'Military Commissary': 'MC',
    'Farmers Market': 'FM',
    'Bakery Specialty': 'BK', 
    'Fruits/Veg Specialty': 'FV',
    'Food Buying Co-op': 'FB',
    'Seafood Specialty': 'SF',
    'Wholesaler': 'WS',  
    'Unknown': 'XX'
}

if 'store_type' in sql_df.columns:
    
    sql_df['storetype_id'] = sql_df['store_type'].map(storetype_id_mapping)
    
    dim_storetype = sql_df[['storetype_id', 'store_type']].drop_duplicates()

else:
    print("Mapping not successful  !!!")

dim_address

In [13]:
sql_df['street_name'] = sql_df['street_name'].str.title()
sql_df['additional_address'] = sql_df['additional_address'].str.title()
sql_df['city'] = sql_df['city'].str.title()
sql_df['county'] = sql_df['county'].str.title()
sql_df['address_id'] = sql_df['record_id']

Create Tables For Upload

In [14]:
fact_snap = sql_df[['record_id', 'storetype_id', 'address_id', 'latitude', 'longitude']].drop_duplicates(subset='record_id')
dim_store = sql_df[['record_id', 'store_name', 'authorization_year', 'authorization_date', 'end_date']].drop_duplicates(subset='record_id')
dim_storetype = sql_df[['storetype_id', 'store_type']].drop_duplicates(subset='storetype_id')
dim_address = sql_df[['address_id', 'street_number', 'street_name', 'additional_address',
                      'city', 'state', 'zip_code', 'zip4', 'county']].drop_duplicates(subset='address_id')

print(fact_snap.shape)
print(dim_store.shape)
print(dim_storetype.shape)
print(dim_address.shape)

#dim_storetype.to_csv("check_duplicate.csv",index=False, mode="w")

(614012, 5)
(614012, 5)
(17, 2)
(614012, 9)


SQL To PostgreSQL

In [21]:
# Load PostgreSQL connection details from JSON file
with open('config.json', 'r') as config_file:
    config = json.load(config_file)

# PostgreSQL connection string
CONNECTION_STRING_POSTGRESQL = config["postgresql_connection_string"]

# Create SQLAlchemy engine
engine = create_engine(CONNECTION_STRING_POSTGRESQL)

try:
    
    fact_snap.to_sql('fact_snap', con=engine, if_exists='append', index=False)
    print("fact_snap data inserted successfully!")
    
    dim_store.to_sql('dim_store', con=engine, if_exists='append', index=False)
    print("dim_store data inserted successfully!")

    dim_storetype.to_sql('dim_storetype', con=engine, if_exists='append', index=False)
    print("dim_storetype data inserted successfully!")

    dim_address.to_sql('dim_address', con=engine, if_exists='append', index=False)
    print("dim_address data inserted successfully!")

except Exception as e:
    print(f"Error inserting data: {e}")

finally:
    engine.dispose()

fact_snap data inserted successfully!
dim_store data inserted successfully!
dim_storetype data inserted successfully!
dim_address data inserted successfully!
