# Import Libraries

In [58]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from sqlalchemy import create_engine
pd.set_option('display.max_columns', None)

# Step 1: Retrieve Data from Source


In [59]:
# Load new data
ccc_data_url = 'https://raw.githubusercontent.com/nonviolent-action-lab/crowd-counting-consortium/master/ccc_compiled_2021-present.csv'
ccc_data = pd.read_csv(ccc_data_url, encoding='ISO-8859-1')

# Load the existing dataset (GeoJSON)
existing_data = gpd.read_file('data_engineer_tech_challenge_existing_dataset.geojson')

# Load County Dataset for Spatial Join (JSON)
county_gdf = gpd.read_file('https://raw.githubusercontent.com/plotly/datasets/master/geojson-counties-fips.json')
county_gdf.columns = county_gdf.columns.str.lower()
county_gdf = county_gdf[['Institute additional data validation or cleaningname', 'geometry']].copy().rename(columns={'name': 'event_county'})
county_gdf['event_county'] = county_gdf['event_county'].str.title()

  ccc_data = pd.read_csv(ccc_data_url, encoding='ISO-8859-1')


In [None]:
# Naive SPC implementation
st1 = {
    'New': ccc_data.shape,
    'Existing': existing_data.shape
}

In [60]:
ccc_data.head(1)

Unnamed: 0,date,locality,state,location_detail,online,type,title,macroevent,actors,organizations,participants,claims,valence,issues,issues_major,size_text,size_low,size_high,size_mean,size_cat,arrests,arrests_any,injuries_crowd,injuries_crowd_any,injuries_police,injuries_police_any,property_damage,property_damage_any,chemical_agents,participant_measures,police_measures,participant_deaths,police_deaths,source_1,source_2,source_3,source_4,source_5,source_6,source_7,source_8,source_9,source_10,source_11,source_12,source_13,source_14,source_15,source_16,source_17,source_18,source_19,source_20,source_21,source_22,source_23,source_24,source_25,source_26,source_27,source_28,source_29,source_30,notes,lat,lon,resolved_locality,resolved_county,resolved_state,fips_code
0,2021-01-01,Montgomery,AL,statewide,0.0,strike; boycott,,,Free Alabama Movement; prisoners,Free Alabama Movement,prisoners,"against prison labor, for safer conditions in ...",1.0,covid; criminal justice; labor,covid; criminal justice; labor,,,,,0,,0,,0,,0,,0,0,,,,,https://sfbayview.com/2020/12/fam-launches-30-...,https://twitter.com/ShutDownRacism/status/1346...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,Scheduled to run 30 days.,32.379223,-86.307737,Montgomery,Montgomery County,AL,1101.0


In [61]:
existing_data.head(1)

Unnamed: 0,id,event_id,event_date,event_state,event_county,event_city,event_type,event_source,geometry
0,0,1,2021-04-21,TX,Travis,Austin,Demonstrations,Crowd Counting Consortium,POINT (-97.74035 30.27467)


In [62]:
county_gdf.head(1)

Unnamed: 0,event_county,geometry
0,Autauga,"POLYGON ((-86.49677 32.34444, -86.71790 32.402..."


# Step 2: Format Data to Match Existing Dataset

In [71]:
# Selecting relevant columns and renaming them to match existing dataset
relevant_columns = {
    'date': 'event_date',
    'state': 'event_state',
    'locality': 'event_city',
    'type': 'event_type',
    'lat': 'latitude',
    'lon': 'longitude',
    'source_1':'event_source'
}
transformed_new_data = ccc_data[relevant_columns.keys()].copy()
transformed_new_data.rename(columns=relevant_columns, inplace=True)

# Creating geometry column from latitude and longitude
transformed_new_data['geometry'] = transformed_new_data.apply(
    lambda row: Point(row['longitude'], row['latitude']), axis=1
)
transformed_new_data = gpd.GeoDataFrame(transformed_new_data, geometry='geometry')

In [72]:
# Deduplicate New Data
transformed_new_data.drop_duplicates(keep='first',inplace=True)

# Drop fields to ensure new and existing data have the same format
transformed_new_data.drop(['latitude','longitude'],axis = 1, inplace=True)

# Convert Event Type into list to allow for counts and other statistical measures
existing_data['event_type'] = existing_data['event_type'].str.split(';')
transformed_new_data['event_type'] = transformed_new_data['event_type'].str.split(';')

In [None]:
# Naive SPC implementation
st2 = {
    'New': transformed_new_data.shape,
    'Existing': existing_data.shape
}

In [73]:
transformed_new_data.head(1)

Unnamed: 0,event_date,event_state,event_city,event_type,event_source,geometry
0,2021-01-01,AL,Montgomery,"[strike, boycott]",https://sfbayview.com/2020/12/fam-launches-30-...,POINT (-86.30774 32.37922)


# Step 3: Spatial Join to Create 'event_county' Column


In [65]:
# https://shapely.readthedocs.io/en/latest/manual.html#binary-predicates
transformed_new_data = gpd.sjoin(transformed_new_data, county_gdf, how='left', op='intersects')
transformed_new_data.drop(columns=['index_right'], inplace=True)

  if (await self.run_code(code, result,  async_=asy)):
Use `to_crs()` to reproject one of the input geometries to match the CRS of the other.

Left CRS: None
Right CRS: EPSG:4326

  transformed_new_data = gpd.sjoin(transformed_new_data, county_gdf, how='left', op='intersects')


In [None]:
# Naive SPC implementation
st3 = {
    'New': transformed_new_data.shape,
    'Existing': existing_data.shape
}

In [66]:
transformed_new_data.head(1)

Unnamed: 0,event_date,event_state,event_city,event_type,event_source,geometry,event_county
0,2021-01-01,AL,Montgomery,strike; boycott,https://sfbayview.com/2020/12/fam-launches-30-...,POINT (-86.30774 32.37922),Montgomery


# Step 4: Perform Rudimentary Deduplication

In [67]:
deduplication_criteria = ['event_date', 'event_state', 'event_city', 'event_type']
duplicates = transformed_new_data[transformed_new_data[deduplication_criteria].isin(existing_data[deduplication_criteria]).all(axis=1)]
deduplicated_new_data = transformed_new_data.drop(duplicates.index)

# Include event_id column as sequential range
deduplicated_new_data['event_id'] = range(max(existing_data.event_id)+1,deduplicated_new_data.shape[0] + max(existing_data.event_id) + 1,1)

In [68]:
deduplicated_new_data.head(1)

Unnamed: 0,event_date,event_state,event_city,event_type,event_source,geometry,event_county,event_id
0,2021-01-01,AL,Montgomery,strike; boycott,https://sfbayview.com/2020/12/fam-launches-30-...,POINT (-86.30774 32.37922),Montgomery,2244


In [None]:
# Naive SPC implementation
st4 = {
    'New': transformed_new_data.shape,
    'Existing': existing_data.shape
}

# Step 5: Load Data into a Database

In [69]:
# # Simulated database connection
# engine = create_engine('postgresql://username:password@localhost:5432/database')

# # Loading data into the database
# deduplicated_new_data.to_sql('table_name', con=engine, index=False, if_exists='append')

## Additional Considerations for Data Engineer Task

### Data Quality and Integrity
- **Validation Checks:** Implement comprehensive checks for data accuracy, including missing values, data types, and outliers.
- **Error Handling:** Robust mechanisms to manage exceptions during data extraction, transformation, or loading.

### Performance Optimization
- **Batch Processing:** Use batch processing for large datasets to handle data in chunks.
- **Parallel Processing:** Apply parallel processing techniques for computationally intensive tasks.

### Data Security and Compliance
- **Data Encryption:** Ensure encryption during data transfer and storage like AES or regular Hash
- **Regulatory Compliance:** Adhere to regulations like GDPR, HIPAA for data protection.

### Logging and Monitoring
- **Detailed Logging:** Implement logging at each ETL stage for auditing and troubleshooting.
- **Monitoring and Alerts:** Set up monitoring and alerts for system failures or performance issues.

### Testing
- **Unit and Integration Testing:** Conduct regular testing to catch issues early.
- **Load Testing:** Perform load testing to evaluate performance under heavy loads.
