In [None]:
import pandas as pd
from io import StringIO
import os
import numpy
import datetime as datetime
import boto3
from io import BytesIO

#replace path with correct path
os.chdir("/home/ec2-user/Contacts-sensitive/")
import functions.aws_um_funcs as funcs
#from functions.aws_um_funcs import *

#from meat_packers.mp_alt import *
from meat_packers.turnover import *
from meat_packers.mp_alt_custom import *
import botocore


In [None]:
# Goal:
# Automatically generate wide format data for all packers, indexed by date
#
# - Iterate thru mpdata, if valid, read from s3
# - Create new df indexed by date in given date range, 2nd column packer, 3rd column fips
# - filter to only connections at the facility
# - add columns for: #devices/day, #new devices/day, #separations, *add more as necessary
# - concat this to large df
# - sort by date
# - write df to s3 in some format, parquet? (do when rest working)
#
#
# Questions/Considerations: 
# What time period to consider for separation? default infinity
# Calculate turnover now and put in this df, or use this df to calculate turnover later?
# Add Covid rates?

In [None]:
def generate_sum_stats(mp_path, start_date, end_date):
    """
    generates the summary dataframe for all packers. saves each one to s3 to be combined later
    """
    mpdata = get_mp_cleaned(mp_path)
    need_dates = []
        
    if end_date > date(2020,8,24):
        need_dates1 = funcs.daterange(date(2020,1,1), date(2020,8,24))
        need_dates2 = funcs.daterange(date(2020,8,26), end_date)
        need_dates = [*need_dates1, *need_dates2]
    else:
        need_dates = funcs.daterange(date(2020,1,1), end_date)
    
    s3 = boto3.resource('s3')
    
    for mp_index in range(0,404):
        
        try:
            existing_s3_key = 'meat_packers/summary/'+ str(mp_index) + '.parquet'
            s3.Object('yse-bioecon', existing_s3_key).load()
            print(f"{mp_index} already exists")
            continue
        except:
            print(f"generating {mp_index}")
        mp_row = mpdata[mp_index:mp_index+1]
        
        company_name = str(mp_index) + "_" + mp_row["company"].iloc[0].replace(" ", "_")
        fips_code = mp_row["geoid"].iloc[0]
        s3_key = 'meat_packers/contacts-dc/'+ fips_code + "/" + company_name + '.parquet'

        try:
            s3.Object('yse-bioecon', s3_key).load()
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == "404":
                print(f"skipping packer, {s3_key} does not exist")
                # The object does not exist.
                
            else:
                # Something else has gone wrong.
                print("skipping packer")
                
            continue

        else:
            # The object does exist.
            packer_poly = mp_poly(mp_row)
            
            if len(packer_poly) <= 0:
                continue
            packer_df = generate_packer_df(s3_key, need_dates, packer_poly.iloc[0]["address"], packer_poly, company_name, fips_code)
            try:
                my_buffer = BytesIO()
                packer_df.to_parquet(my_buffer)
                s3.Object('yse-bioecon', 'meat_packers/summary/' + str(mp_index) + '.parquet').put(Body=my_buffer.getvalue())
            except:
                print("no contacts detected")
    return

def create_wide_df(mp_path):
    """
    combines individual summary files into one large dataframe
    """
    s3 = boto3.resource('s3')
       
    dfs = []
    for i in range(404):
       

        s3_key = 'meat_packers/summary/' + str(i) + '.parquet'
        try:
            s3.Object('yse-bioecon', s3_key).load()
            data = getpardata('yse-bioecon', s3_key)
            dfs.append(data)
        except:
            continue
    wide = pd.concat(dfs) if len(dfs) > 1 else dfs[0]
    
       
    wide = wide.reset_index()
    wide = wide.sort_values(["date_dt"]).reset_index(drop=True)
    
    return wide
    
    
        

In [None]:
def get_mp_cleaned(mp_path):
    mpdata = pd.read_csv(mp_path)
    mpdata = mpdata[mpdata['verified'] == 1]
    mpdata = mpdata[mpdata['close_to'] == 0]
    mpdata = mpdata[mpdata['polyid'].notna()]
    mpdata = mpdata.drop(['verified', 'close_to', 'census_tract.y'], axis = 1)
    mpdata = mpdata.rename(columns = {'census_tract.x' : 'census_tract'})
    mpdata['geoid'] = mpdata['fips_code'].apply(lambda x: ("0" + str(x)) if len(str(x)) == 4 else str(x))
    mpdata['path'] = mpdata['state'].apply(lambda x: x.lower()) + "/" + mpdata['geoid'] + "/"
    mpdata.reset_index(drop = True, inplace =True)
    #make mpdata spatial
    mpdata = gpd.GeoDataFrame(mpdata, geometry = gpd.points_from_xy(mpdata.lon, mpdata.lat)).set_crs(epsg=4326)
    return mpdata

In [None]:
#TODO split this into more functions, make more efficient. Current implementation is slow
def generate_packer_df(s3_key, need_dates, address, packer_poly, company_name, fips_code):
    
    """
    generates summary data for a given packer
    total_contacts, different_fips, num_contacts_packer, unique_devices_packer, new_devices, separations, mean_cel, std_cel
    
    """
    
    data = getpardata('yse-bioecon', s3_key)
   
    if len(data) <= len(need_dates):
        print(company_name)
        print("abort!")
        return pd.DataFrame()
    
    data["index0"] = range(len(data))
    
    #TODO error, to_datetime fails on a few packers, not sure why
    try:
        data["date_dt"] = pd.to_datetime(data["date"], format='%Y%m%d')
    except:
        return
    
    #total contacts, different fips, contacts at packer
    gb_date_full = data.groupby(["date_dt"])["device_id_1"]
    total_contacts = gb_date_full.count().reset_index().rename(columns={'device_id_1': 'total_contacts'}).set_index('date_dt')
    
    data_diff_fips = data[data.fips_code != str(fips_code)]
    gb_date_fips = data_diff_fips.groupby(["date_dt"])["device_id_1"]
    different_fips = gb_date_fips.count().reset_index().rename(columns={'device_id_1': 'different_fips'}).set_index('date_dt')
    
    at_packer = data[data.address == address]
    gb_date = at_packer.groupby(["date_dt"])["device_id_1"]
    num_contacts_packer = gb_date.count().reset_index().rename(columns={'device_id_1': 'at_packer'}).set_index('date_dt')

    
    #empty dataframes to be filled, can remove most of these but left in for now
    date_df = pd.DataFrame()
    unique_devices_packer = pd.DataFrame()
    new_devices = pd.DataFrame()
    separations = pd.DataFrame()
    mean_cel = pd.DataFrame()
    
    
    #common evening locations
    std_cel = pd.DataFrame()
    cel_series = pd.concat([data[["date_dt", "cel_distance_m_1"]].rename(columns={'cel_distance_m_1': 'cel_distance'}), 
                            data[["date_dt", "cel_distance_m_2"]].rename(columns={'cel_distance_m_2': 'cel_distance'})])

    cel_gb = cel_series.groupby(["date_dt"])
    mean_cel = cel_gb.mean().rename(columns={'cel_distance': 'mean_cel'})
    std_cel = cel_gb.std().rename(columns={'cel_distance': 'std_cel'})
    
    
    
    #duplicates each contact into one row with device 1, one row with device 2
    long_data = pd.wide_to_long(at_packer, ["device_id_"], i='index0', j="test")
    
    
    #unique devices at the packing facility
    gb_date_long = long_data.groupby(["date_dt"])["device_id_"]
    unique_devices_packer = gb_date_long.nunique().reset_index().rename(columns={'device_id_': 'unique_devices'})
    unique_devices_packer.set_index('date_dt', inplace=True)
       
    #calculate new devices at packer    
    #days_since_last_visit = long_data.sort_values(['device_id_', 'date_dt']).groupby('device_id_')['date_dt'].diff().dt.days
    days_since_last_visit = long_data.sort_values(['device_id_', 'date_dt']).groupby('device_id_')['date_dt'].diff()
    asc_data = long_data.sort_values(["device_id_", "date_dt"])
    asc_data["days_since_last_appearance"] = days_since_last_visit
    asc_data = asc_data[asc_data["days_since_last_appearance"].isna()].drop(['usecode', 'neighborhood', 'zoning', 'numstories', 'sqft', 'rdi',
           'unixtime_2', 'speed_kph_1', 'census_block', 'parval',
           'numunits', 'cel_distance_m_2', 'saleprice',
           'local_time_1', 'lat', 'geoid', 'structstyle', 'unixtime_1', 'mailadd',
           'gisacre', 'agval', 'usedesc', 'mail_state2',
           'local_time_2', 'cbg', 'scity', 'usps_vacancy_date', 'lbcs_activity',
           'lbcs_function', 'parcelnumb', 'lbcs_function_desc', 'index_right',
           'census_blockgroup', 'dpv_status', 'll_gissqft', 'datasource_id_2',
           'cel_distance_m_1', 'mail_city', 'yearbuilt', 'distance_m',
           'lbcs_activity_desc', 'census_tract', 'owntype', 'parvaltype', 'city',
           'szip', 'lon', 'll_gisacre', 'mail_zip', 'speed_kph_2',
           'geometry', 'zoning_description', 'usps_vacancy', 'saledate',
           'datasource_id_1', 'date', 'address', 'days_since_last_appearance', 'crap', "fips_code"], axis=1)
    
    new_devices = asc_data.groupby('date_dt').count().reset_index().rename(columns={'device_id_': 'new_devices'}).reset_index(drop=True)
    new_devices.set_index('date_dt', inplace=True)

    #calculate separations
    #days_to_next_visit = long_data.sort_values(by=['device_id_', 'date_dt'], ascending=False).groupby('device_id_')['date_dt'].diff().dt.days
    days_to_next_visit = long_data.sort_values(by=['device_id_', 'date_dt'], ascending=False).groupby('device_id_')['date_dt'].diff()

    dsc_data = long_data.sort_values(["device_id_", "date_dt"], ascending=False)
    dsc_data["days_to_next_visit"] = days_to_next_visit
    
    dsc_data = dsc_data[dsc_data["days_to_next_visit"].isna()].drop(['usecode', 'neighborhood', 'zoning', 'numstories', 'sqft', 'rdi',
           'unixtime_2', 'speed_kph_1', 'census_block', 'parval',
           'numunits', 'cel_distance_m_2', 'saleprice',
           'local_time_1', 'lat', 'geoid', 'structstyle', 'unixtime_1', 'mailadd',
           'gisacre', 'agval', 'usedesc', 'mail_state2',
           'local_time_2', 'cbg', 'scity', 'usps_vacancy_date', 'lbcs_activity',
           'lbcs_function', 'parcelnumb', 'lbcs_function_desc', 'index_right',
           'census_blockgroup', 'dpv_status', 'll_gissqft', 'datasource_id_2',
           'cel_distance_m_1', 'mail_city', 'yearbuilt', 'distance_m',
           'lbcs_activity_desc', 'census_tract', 'owntype', 'parvaltype', 'city',
           'szip', 'lon', 'll_gisacre', 'mail_zip', 'speed_kph_2',
           'geometry', 'zoning_description', 'usps_vacancy', 'saledate',
           'datasource_id_1', 'date', 'address', 'days_to_next_visit', 'crap', "fips_code"], axis=1)
   
    
    separations = dsc_data.sort_values(["date_dt"]).groupby('date_dt').count().reset_index().rename(columns={'device_id_': 'separations'}).reset_index(drop=True)
    separations.set_index('date_dt', inplace=True)
    
    

    #setup dataframe to return
    date_df = pd.DataFrame({'date_dt': need_dates})
    date_df["date_dt"] = pd.to_datetime(date_df["date_dt"], format='%Y%m%d')
    date_df = date_df.set_index(["date_dt"])
    
    result = pd.concat([total_contacts, different_fips, num_contacts_packer, unique_devices_packer, new_devices, separations, date_df, mean_cel, std_cel], join="outer", axis=1).fillna(0)
    result["fips_code"] = fips_code
    result["company_name"] = company_name
    while 'use' in result.columns:
        result = result.drop(['use'], axis=1)
    print('success')
    return result

def mp_poly(mpdata):
    #copied from mp_alt
    #select a row from the packer, mpdata, geodataframe
    tmp = mpdata[0:1]

    
    #get the gpkg file -- this needs to be seperate because it is slow, and we can group this stream. 
    tmp_geoid = tmp['geoid'].iloc[0]
    tmp_gpkg =  funcs.getgpkg_gz('yse-bioecon', 'landgrid_geoid/' + tmp_geoid + ".gpkg.gz")

    #Gets the polygon associated with a single meatpacker
    packer_tmp = get_packer_poly(tmp, tmp_gpkg)
    return packer_tmp

In [None]:
generate_sum_stats("s3://yse-bioecon/meat_packers/meat_packers.csv", date(2020,1,1), date(2021,4,1))

In [None]:
test = create_wide_df("s3://yse-bioecon/meat_packers/meat_packers.csv")

In [None]:
test = test[test.company_name != "5_DEAN_SAUSAGE_CO_INC"] #this one never has any contacts, slipped through filter

In [None]:
test.describe()
#total_contacts: how many contacts workers from a given packer made in a day
#different_fips: how many of said contacts were in a different county from the packing facility
#at_packer: how many contacts were at the facility
#unique devices: how many unique devices were at the facility
#new devices: number of devices at facility which have never beenn seen there before
#separations: number of devices at facility which are never seen again
#mean cel: average distance from common evening location of all devices
#std cel: standard deviation of above

In [None]:
test[0:5]

In [None]:
test_gb_date = test.groupby(["date_dt"])
ratio = test_gb_date.different_fips.mean() / test_gb_date.total_contacts.mean()

In [None]:
ratio.plot()

In [None]:
test[test. <= 20000].total_contacts.hist(bins=1000)

In [None]:
test[["total_contacts", "different_fips", "at_packer"]].plot()

In [None]:
test[["mean_cel"]].plot()

In [None]:
test_gb_date.total_contacts.mean().plot()

In [None]:
nyt2020 = pd.read_csv('https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties-2020.csv')
nyt2021 = pd.read_csv('https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties-2021.csv')
nytcovid = pd.concat([nyt2020, nyt2021]).rename(columns={'fips': 'fips_code'}).drop(["state", "county"], axis=1)
nytcovid['fips_code'] = nytcovid['fips_code'].apply(lambda x: ("0" + str(x))[:-2] if len(str(x)) == 6 else str(x)[:-2])
nytcovid["date"] = pd.to_datetime(nytcovid["date"], format='%Y-%m-%d')
test = test.rename(columns={'date_dt': 'date'})
test = test.merge(nytcovid, how="left", on=['date', 'fips_code'])

In [None]:
test

In [None]:
import matplotlib.pyplot as plt

In [None]:
test_gb_date = test.groupby(["date"])
#ratio over time
summary = pd.DataFrame()
summary["ratio"] = test_gb_date.different_fips.mean() / test_gb_date.total_contacts.mean()
#summary["deaths"] = test_gb_date.deaths.mean()
plt.figure()
summary.plot(figsize=(10,10))
plt.xlabel('Date')
plt.ylabel('out/in county ratio')
plt.legend(loc='upper left')
plt.title("Ratio of In vs Out of County Contacts")

In [None]:
summary = pd.DataFrame()
summary["total_contacts"]= test_gb_date.total_contacts.mean()
summary["different_fips"] = test_gb_date.different_fips.mean()
summary["att_packer"] = test_gb_date.at_packer.mean()
#summary["deaths"] = test_gb_date.deaths.mean()
plt.figure()
summary.plot(figsize=(10,10))
plt.xlabel('Date')
plt.ylabel('# Contacts')
#plt.legend(loc='upper left')

plt.title("Average Contacts In vs Out of Packer County")

In [None]:
summary = pd.DataFrame()
summary["deaths"] = test_gb_date.deaths.mean()
summary["cases"] = test_gb_date.cases.mean()
plt.figure()
summary.plot(figsize=(10,10))
plt.xlabel('Date')
plt.ylabel('Number of People')
#plt.legend(loc='upper left')

plt.title("Deaths vs Cases")