In [5]:
from concurrent.futures import ThreadPoolExecutor
import xml.etree.ElementTree as ET
from datetime import datetime
from decimal import Decimal
import csv
import json
import pyodbc
import requests
import time 

In [6]:
# Load participant age, status, and type dictionaries
with open('dict_partecipant_age.json') as f1:
    dict_partecipant_age = json.load(f1)

with open('dict_partecipant_status.json') as f2:
    dict_partecipant_status = json.load(f2)

with open('dict_partecipant_type.json') as f3:
    dict_partecipant_type = json.load(f3)

In [7]:
# Function to compute additional date-related data
def compute_date_data(date_str):
    date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
    date = date_obj.date()
    day = date_obj.day
    month = date_obj.month
    year = date_obj.year
    quarter = (date_obj.month - 1) // 3 + 1
    day_of_week = date_obj.strftime('%A')
    return date, day, month, year, quarter, day_of_week

In [8]:
# Function to parse dates.xml and create a mapping of date_fk to real date
def parse_dates_xml(xml_file):
    date_mapping = {}
    tree = ET.parse(xml_file)
    root = tree.getroot()

    for row in root.findall('.//row'):
        date = row.find('date').text
        date_pk = int(row.find('date_pk').text)
        date_mapping[date_pk] = date

    return date_mapping


In [9]:
# Function to compute crime gravity using provided dictionaries
def compute_crime_gravity(x):
    gravity = dict_partecipant_age.get(x['participant_age_group'], 1) * \
              dict_partecipant_type.get(x['participant_type'], 1) * \
              dict_partecipant_status.get(x['participant_status'], 1)
    return gravity

In [11]:
# Function to get location info using Google Maps Geocoding API with retry logic
def get_location_info_with_retry(latitude, longitude, api_key, max_retries=3):
    latitude = float(latitude)
    longitude = float(longitude)

    for attempt in range(max_retries):
        try:
            url = f"https://maps.googleapis.com/maps/api/geocode/json?latlng={latitude},{longitude}&key={api_key}"
            response = requests.get(url)
            data = response.json()

            if data.get("results"):
                address_components = data["results"][0]["address_components"]
                city = next((component["long_name"] for component in address_components if "locality" in component["types"]), None)
                state = next((component["long_name"] for component in address_components if "administrative_area_level_1" in component["types"]), None)

                return {"city": city or None, "state": state}
        except requests.exceptions.RequestException as e:
            print(f"Error in geocoding request (attempt {attempt + 1}/{max_retries}): {e}")
            time.sleep(1)  # Adding a delay before retrying

    return {"city": None, "state": None}



In [12]:
#Write to my db
# Connection string
server = 'tcp:lds.di.unipi.it'
username = 'Group_ID_200'
password = '89VIG10K'
database = 'Group_ID_200_DB'
connectionString = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server + ';DATABASE=' + database + ';UID=' + username + ';PWD=' + password

In [13]:
# Connect to the SQL Server database
conn = pyodbc.connect(connectionString)
cursor = conn.cursor()

In [15]:
# Function to insert data into the database with ID
def insert_data_with_ID(conn, cursor, id_dict, table_name, key_dict):
    key_tuple = tuple(key_dict.values())
    if key_tuple not in id_dict:
        id_dict[key_tuple] = next(iter(key_dict.values()))
        columns = ', '.join(key_dict.keys())
        placeholders = ', '.join(['?'] * len(key_dict))
        insert_query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders});'

        cursor.execute(insert_query, list(key_dict.values()))
        #conn.commit()

In [16]:
# Function to insert data into the database without (ID's are created automatically - Surrogate key)
def insert_data_without_ID(conn, cursor, table_name, key_dict, identity_column='YourIdentityColumn'):
    columns = ', '.join(key_dict.keys())
    placeholders = ', '.join(['?'] * len(key_dict))
    insert_query = f'INSERT INTO {table_name} ({columns}) OUTPUT INSERTED.{identity_column} VALUES ({placeholders});'

    # Execute the INSERT query and fetch the last inserted row's ID
    cursor.execute(insert_query, list(key_dict.values()))
    id_generated = cursor.fetchone()[0]

    # Return the last inserted row's ID
    return id_generated

In [None]:
def get_or_insert_id(conn, cursor, id_dict, table_name, key_dict, identity_column='IdentityColumn_id'):
    key_tuple = tuple(key_dict.values())
    if key_tuple in id_dict:
        return id_dict[key_tuple]
    else:
        last_inserted_id = insert_data_without_ID(conn, cursor, table_name, key_dict, identity_column)
        id_dict[key_tuple] = last_inserted_id
        return last_inserted_id

In [None]:
geo_id_dict = {}
gun_id_dict = {}
partecipant_id_dict = {}
date_id_dict = {}
incident_id_dict = {}
custody_id_dict = {}

In [None]:
date_mapping = parse_dates_xml('dates.xml')

In [25]:
#Read data from Police.csv and imsert appropriate data into the tables in Database except city, state which is inserted later
def split_and_integrate(csv_file):
    # List of table names in your database
    table_names = ['Custody', 'Geography', 'Gun', 'Date', 'Incident', 'Partecipant']

    # Clean the tables by deleting all records
    for table_name in table_names:
        cursor.execute(f'DELETE FROM {table_name}')
        conn.commit()

    # Read and process Police.csv
    with open(csv_file, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        next(reader)  # Skip the header row

        # Set the batch size
        batch_size = 10000

        # Counter to track the number of processed rows
        row_count = 0
      
        for row in reader:
                try:
                    custody_id, participant_age_group, participant_gender, participant_status, participant_type, latitude, longitude, gun_stolen, gun_type, incident_id, date_fk = row

                    gun_stolen_bool = 1 if row["gun_stolen"] == 'Stolen' else 0
                    gun_key_dict = {"is_stolen": gun_stolen_bool, "gun_type": row['gun_type']}
                    gun_id = get_or_insert_id(conn, cursor, gun_id_dict, 'Gun', gun_key_dict, "gun_id")


                    partecipant_key = {
                        "age_group" : row['participant_age_group'], 
                        "gender" : row['participant_gender'], 
                        "type" : row['participant_type'], 
                        "status" : row['participant_status']
                        }
                    partecipant_id = get_or_insert_id(conn, cursor, partecipant_id_dict, 'Partecipant', partecipant_key, "partecipant_id")

                    
                    latitude, longitude = float(row['latitude']), float(row['longitude'])
                    country = "United States"
                    continent = "North America"
                    geo_key = {
                        "latitude" : str(latitude), "longitude" : str(longitude), "country" : country, "continent" : continent
                        }
                    geo_id = get_or_insert_id(conn, cursor, geo_id_dict, 'Geography', geo_key, "geo_id")


                    # Normal ID, No Incremental Tables:
                    date_id = int(row['date_fk'])
                    date_value = date_mapping[date_id]
                    date, day, month, year, quarter, day_of_week = compute_date_data(date_value)
                    date_key ={
                        "date_id": date_id, "the_date" : date, "the_day" : day, "the_month" : month, "the_year" : year, "quarter" : quarter, "day_of_week" : day_of_week
                        }
                    insert_data_with_ID(conn, cursor, date_id_dict, "Date", date_key)
                

                    incident_id = int(row['incident_id'])
                    incident_key = {"incident_id" : incident_id}
                    insert_data_with_ID(conn, cursor, incident_id_dict, "Incident", incident_key)
                    

                    custody_id = row['custody_id']
                    custody_key = {
                        "custody_id" : custody_id, "partecipant_id" : partecipant_id, "gun_id" : gun_id, "geo_id" : geo_id, "date_id" : date_id, "crime_gravity" : 
                        compute_crime_gravity(row), "incident_id " : incident_id
                        }
                    insert_data_with_ID(conn, cursor, custody_id_dict, 'Custody', custody_key)
                 

                    # Increment the row count
                    row_count += 1
    
                    if row_count % batch_size == 0:
                        print(row_count)
                        conn.commit()
                    
                        
                except Exception as e:
                    print(f"Error processing row {row_count}: {e}")
                    # If an error occurs, rerun from the same row
                    csvfile.seek(0)  # Reset the file pointer to the beginning
                    next(reader)  # Skip the header row
                    for _ in range(row_count):
                        next(reader)
        

    # Commit any remaining records
    conn.commit()




In [26]:
# Call the function with the appropriate arguments
split_and_integrate('Police.csv')

# Close the database connection when done
cursor.close()
conn.close()


In [None]:
# Connect to the SQL Server database
conn = pyodbc.connect(connectionString)
cursor = conn.cursor()

# Selecting all rows with city and state which are NULL
query = "SELECT latitude, longitude FROM Geography WHERE city IS NULL OR state IS NULL"
cursor.execute(query)

# Fetch all records at once
records = cursor.fetchall()
print(len(records))

# My Google Maps API key
google_maps_api_key = "AIzaSyBU-5iM3eGnShHFm0V1NFnkGmInJRysaOI"

# Batch size for committing changes
batch_size = 1000
row = 0

# Use ThreadPoolExecutor for parallel processing (adjust the number of threads as needed)
with ThreadPoolExecutor(max_workers=8) as executor:
    for start_index in range(0, len(records), batch_size):
        end_index = start_index + batch_size
        batch_records = records[start_index:end_index]

        # Get location info for the current batch
        location_info_list = list(executor.map(lambda record: get_location_info_with_retry(*record, google_maps_api_key), batch_records))

        print("Processing batch:", start_index // batch_size + 1)
        for record, info in zip(batch_records, location_info_list):
            update_query = "UPDATE Geography SET city = ?, state = ? WHERE latitude = ? AND longitude = ?"
            try:
                cursor.execute(update_query, (info["city"], info["state"], *record))

                # Fetch the updated record
                select_query = "SELECT * FROM Geography WHERE latitude = ? AND longitude = ?"
                cursor.execute(select_query, record)
                updated_record = cursor.fetchone()

            except Exception as e:
                print(f"Error processing row {row}: {e}")
                break  # Exit the loop on error

        conn.commit()
        # Print progress
        row += batch_size
        print("Processed rows:", row)
    conn.commit() #Commit any remaining records
    

# Close the cursor and connection
cursor.close()
conn.close()
