In [42]:
""" Trip Length and Speed Calculation

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE
"""

from collections import namedtuple
import logging
import sys


from geopy.distance import great_circle
import pandas as pd
import geopandas as gpd
import numpy as np

from datetime import timedelta, date, datetime
from statistics import *
from shapely.geometry import MultiPolygon, Polygon
from shapely import wkt


from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType,
    LongType,
    StructField,
    IntegerType,
    StringType,
    DoubleType,
    TimestampType,
    ArrayType
)
import pyspark.sql.functions as F
from math import *
import time
import json

import matplotlib.pyplot as plt
%matplot inline


spark = SparkSession.builder.appName(f"trips_veraset").getOrCreate()
spark.sparkContext.addPyFile("s3://ipsos-dvd/scripts/utils.py")
from utils import *


data_dir = "s3://ipsos-dvd/ukr/data/"


""" Spatial join in pandas_udf """
@pandas_udf(StringType())
def sjoin(x,y):

    crs = 'epsg:2587'
    ids = pd.DataFrame.from_records([json.loads(c) for c in gdf_broadcast.value])
    polygons = [wkt.loads(w) for w in ids['geometry']]
    gdf_communes = gpd.GeoDataFrame(ids, geometry=polygons, crs='epsg:4326').to_crs(crs)
    geometry = gpd.points_from_xy(x, y)
    gdf_traces = gpd.GeoDataFrame(data = [[x] for x in range(len(x))], geometry=geometry, columns=['idd'], crs='epsg:4326').to_crs(crs)
    settlement = gpd.sjoin(gdf_traces,gdf_communes,how='left',predicate='within')#.fillna(np.nan)
    if settlement.shape[0] > 0:
        return settlement.groupby('idd').agg({join_column_name:lambda x: list(x)}).reset_index().sort_values(by='idd').loc[:,join_column_name].astype('str')# if pd.isnull(sum(x)) == False else np.nan}).reset_index().sort_values(by='id').loc[
    else:
        return ""
    

# function to calculate distance
def haversine(lat1, lon1, lat2, lon2):
    if lat1 == None or lat2 == None:
        return None
    else:
        return 2*6378000*sqrt(pow(sin((lat2-lat1)/2),2) + cos(lat1)*cos(lat2)*pow(sin((lon2-lon1)/2),2)) # result in meters
dist_udf=udf(haversine, DoubleType())

""" Run a single event study """
def eventStudies(temp, x_columns, schema, group_column = "id", y_column = 'kak'):
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    # Input/output are both a pandas.DataFrame
    def ols(pdf):
        import statsmodels.api as sm
        group_key = pdf[group_column].iloc[0]
        y = pdf[y_column]
        X = pdf[x_columns]
        X = sm.add_constant(X)
        model = sm.OLS(y.astype(float), X.astype(float), missing = "drop").fit()

        
        return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

    beta = temp.groupBy(group_column).apply(ols)

    return beta.toPandas()

""" Function to estimate event studies in parallel """
def runEventStudies(fn, x_columns = None):
    # estimate event studies in a loop
    y_column = 'kak'
    group_column = "id"
    
    if x_columns is None:
        x_columns = [col for col in air.columns if "TYPE" in col] # dummies

    for sample in [""]:
        for outcome in ["distance_sum"]: # , "kmh_avg", "distance_point", "altitude_avg"]:
            for lg  in ["", "log"]:
                    # take logs of outcome
                    #temp = air.withColumn('kak', when(col("distance_point").isNotNull(), F.log(1+col('distance_point'))).otherwise(lit(None)))
                    temp = air.filter(col(outcome).isNotNull())

                    if outcome == "distance_point":
                        temp = temp.filter(col("distance_point") != 0)

                    if lg:
                        temp = temp.withColumn('kak',  F.asinh(col(outcome)))
                    else:
                        temp = temp.withColumn('kak',  col(outcome))

                    if sample == "_sample": # filter devices that appeared in all three periods
                        temp = temp.filter(col("count") == 3)

                    # df has four columns: id, y, x1, x2
                    fields = [StructField(group_column, StringType(), False)] + [StructField(col, DoubleType(), False) for col in x_columns]
                    schema = StructType(fields)

                    coeffs = eventStudies(temp, x_columns, schema)


                    out = data_dir + fn + "/" +  "coeffs_" + outcome + "_" + lg + sample + ".csv"
                    coeffs.to_csv(out, index = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Wrangle data into caid-by-minute panel format with start and end date of raids for corresponding minutes

In [2]:
# broadcast air raids shapefile
df_gp = gpd.read_file(data_dir + "air_geos")
df_gp = df_gp[df_gp.geometry != None]
df_gp = df_gp.set_crs(crs = "epsg:4326")
df_gp = df_gp.to_wkt()
gp_spark = spark.createDataFrame(df_gp)
df_gp = gp_spark.toJSON().collect()
gdf_broadcast = spark.sparkContext.broadcast(df_gp)

air = spark.read.csv(data_dir + "air_raid_alerts.csv", header = True).select("start_dt", "end_dt", "region_english")


dist_udf=udf(haversine, DoubleType())

# window to calculate device-level speed and distance
w1 = Window.partitionBy('caid').orderBy('utc_timestamp')
w = Window.partitionBy('caid', 'minute')

# set schemas for efficiency
essential_fields = [
        StructField("utc_timestamp",LongType(),True),
        StructField("caid",StringType(),True),
        StructField("latitude",DoubleType(),True),
        StructField("longitude",DoubleType(),True),
        StructField("altitude",DoubleType(),True)
]
raw_schema = StructType(
    essential_fields + [
        StructField("id_type",StringType(),False),
        StructField("geo_hash",StringType(),False),
        StructField("horizontal_accuracy",DoubleType(),False),
        StructField("ip_address",StringType(),False),
        #StructField("altitude",DoubleType(),False),
        StructField("iso_country_code",StringType(),False)]
)

# load data, calculate columns of interest, and aggregate to minute level
paths = ["s3://ipsos-dvd/ukr/data/pings_2022-01-01_2022-09-30/"]

join_column_name = "region_eng" # for the sjoin udf

pings = (spark.read.schema(StructType(essential_fields)).parquet(*paths)#.limit(1000000)
        .select("caid", "latitude", "longitude", "utc_timestamp", "altitude",)
        # .filter(col("horizontal_accuracy") <= 150) # 150m or less horizontal accuracy (similar to safegraph)
        # .drop(col('horizontal_accuracy'))
        #.filter(col('longitude').between(east,west) & col('latitude').between(north, south))
        .withColumn('date', F.from_utc_timestamp(col('utc_timestamp').cast(dataType=TimestampType()), 'GMT+3')) # date column
        .withColumn('day', F.to_date(col("date")))
        .withColumn('minute', F.date_trunc("minute", col("date"))) # minute column
        .drop("date")
        # calculate speed and distance
        .withColumn('latitude_lag', lag(col('latitude')).over(w1)).withColumn('longitude_lag', lag(col('longitude')).over(w1)).fillna(0)
        .withColumn('distance', when(col('latitude_lag') != lit(0), dist_udf(pi * col('latitude_lag') / 180, pi * col('longitude_lag') / 180, 
                                                                            pi * col('latitude') / 180, pi * col('longitude') / 180)).otherwise(lit(None)))
        .withColumn('time_diff', (col('utc_timestamp') - lag('utc_timestamp',1).over(w1)))
        .withColumn('kmh', 3.6 * col('distance') / col('time_diff')) # 1 m/s is 3.6 km/h
        .filter(col('kmh') <= 300) # filter jumpy pings
        # group by device-minute
        .groupBy('caid', 'minute', 'day')
        .agg(F.avg('kmh').alias('kmh_avg'),
            F.avg('distance').alias('distance_avg'),
            F.sum('distance').alias('distance_sum'),
            F.avg('longitude').alias('longitude_center'),
            F.avg('latitude').alias('latitude_center'),
            F.avg("altitude").alias("altitude_avg")
        )
        # spatial match with shapefile of air raids
        # note: some pings will be matched to multiple regions as some of the air raid regions are nested
        # it doesn't really matter however, as we can just say the ping is exposed to a raid whenever any of the regions it is in have one
        .withColumn("air_geo", sjoin(F.col("longitude_center"),F.col("latitude_center")))
        .withColumn('air_geo',  F.explode(F.split(regexp_replace(col("air_geo"), "[\[\],\']", "").alias('replaced'), ','))) # expand list of matched regions
        .filter(col("air_geo") != "nan")
        .join(air, col("air_geo") == air.region_english, "left").withColumn("alert", when(col("minute").between(col("start_dt"), col("end_dt")), 1).otherwise(0))
        .withColumn("start_raid", when(col("alert") == 1, col("start_dt")).otherwise("2025-01-01 00:00:00"))
        .withColumn("start_raid_min", F.min("start_raid").over(w))
        .withColumn("end_raid", when(col("alert") == 1, col("end_dt")).otherwise("1970-01-01 00:00:00"))
        .withColumn("end_raid_max", F.max("end_raid").over(w))
        .drop("start_dt", "end_dt", "start_raid", "end_raid").dropDuplicates(["caid","minute"])
        .withColumn("start_raid_min", when(col("start_raid_min") == "2025-01-01 00:00:00", 'NA').otherwise(col("start_raid_min")))
        .withColumn("end_raid_max", when(col("end_raid_max") == "1970-01-01 00:00:00", 'NA').otherwise(col("end_raid_max")))
        .drop("air_geo")
        )
# pings.show()
pings.write.option("header", True).mode("overwrite").partitionBy("day").parquet(data_dir + "pings_minute_panel_v5")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# intersect with cities
paths = [data_dir + fn for fn  in ["pings_minute_panel_v5"]]
air_raw = spark.read.parquet(*paths)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# broadcast cities shapefile
join_column_name = "city"
df_gp = gpd.read_file(data_dir + "cities")
df_gp = df_gp[df_gp.geometry != None]
df_gp = df_gp.set_crs(crs = "epsg:4326")
df_gp = df_gp.to_wkt()
gp_spark = spark.createDataFrame(df_gp)
df_gp = gp_spark.toJSON().collect()
gdf_broadcast = spark.sparkContext.broadcast(df_gp)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
air_raw = air_raw.withColumn("city", sjoin(F.col("longitude_center"),F.col("latitude_center")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
air_raw = (air_raw.withColumn('city',  F.explode(F.split(regexp_replace(col("city"), "[\[\],\']", "").alias('replaced'), ','))) # expand list of matched regions
                .withColumn("rural", F.when(col("city") == "nan", 1).otherwise(0)))
air_raw.write.option("header", True).mode("overwrite").partitionBy("day").parquet(data_dir + "pings_minute_panel_v5_cities")
           

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Run event study regressions for each alert with enough pings (~2K) in pandas_udf

In [4]:
def prepData(hours = 0.5, extend = False):

    paths = [data_dir + fn for fn  in ["pings_minute_panel_v5_cities"]]
    air_raw = spark.read.parquet(*paths)

    #print(air_raw.count())
    #filter devices that cover large distance in a day to account for refugee flows
    temp = (air_raw.groupBy('caid', 'day').agg(F.sum(col('distance_sum')).alias('distance_sum'))
            .filter(~((col('distance_sum') >0) & (col('distance_sum') < 100000) )) # devices have to move at least once, but not more than 100 km
           )
    air_raw = air_raw.join(temp, on = ["caid", "day"], how = "leftanti")


    #print(air_raw.count())
    air = air_raw.alias('air')

    # code pre-period
    w1 = Window.partitionBy("caid").orderBy(F.desc("minute"))



    #hours = 0.5

    air = (air.filter(col("region_english").isNotNull())
             .withColumn("start_raid_min", when(col("start_raid_min") == "NA", lit(None)).otherwise(F.date_trunc("minute",col("start_raid_min")))) # encode null values and truncate air raid start to minute
             .withColumn("start_raid_next", last(col('start_raid_min'), ignorenulls = True).over(w1)) # take the next (in terms of time) non-na value of start_raid to get the start of the next raid
             .withColumn("time_to_raid", col("minute").cast("long") - col("start_raid_next").cast(TimestampType()).cast("long")) # compute seconds to time raid, will be in multiples of 60 bc of truncation above
             .filter(col("time_to_raid").isNotNull())
             .withColumn("id", F.concat(col("start_raid_next"), col("region_english")))
          )
    
    if extend:
        # only retain alerts longer than 60 minutes
        alerts = pd.read_csv(data_dir + "air_raid_alerts.csv")
        alerts['start_dt'] = pd.to_datetime(alerts['start_dt'])
        alerts['duration'] =  (pd.to_datetime(alerts['end_dt']) - alerts['start_dt']).dt.total_seconds() / 60
        alerts = alerts[alerts.duration > 60*hours]
        alerts['start_raid_next'] = alerts.start_dt.dt.floor('min')
        alerts['region_english'] = alerts.region_english.astype('str')
        alerts = spark.createDataFrame(alerts[['region_english', 'start_raid_next']])
        
        air = air.join(alerts.select('region_english', 'start_raid_next'), how = "inner", on = ['region_english', 'start_raid_next'])
    
    air = air.cache()

    # create distance from point in 24 hours before alert, 86400
    temp = (air.filter((col("time_to_raid").between(-86400,-0.1)) & (col("latitude_center") != 0)).groupBy("caid", "id").
                agg(F.max(col("time_to_raid")).alias("time_to_raid")))
    temp2 = air.select("caid", "id", "time_to_raid", "latitude_center", "longitude_center").join(temp, on = ['caid', 'id', 'time_to_raid'], how = "leftsemi")
    temp2 = temp2.withColumnRenamed("latitude_center", "latitude_pre").withColumnRenamed("longitude_center", "longitude_pre").drop("time_to_raid")
    air = air.join(temp2, on = ['caid', 'id'], how = "left")
    air = air.withColumn("distance_point", when((col("latitude_center") != 0) & (col("latitude_pre") != 0), dist_udf(pi * col('latitude_center') / 180, pi * col('longitude_center') / 180, 
                                                                                 pi * col('latitude_pre') / 180, pi * col('longitude_pre') / 180)
                                               ).otherwise(lit(None))
    )


    air = air.filter(col("time_to_raid").between(-3600*hours, 3600*hours)) # balance x hours around the event for now


    w2 = Window.partitionBy("start_raid_next", "region_english")
    #air.withColumn("raid_count", F.countDistinct("minute").over(w2)).agg({'raid_count' : 'max'}).first()[0]
    temp = air.groupBy("start_raid_next", "region_english").agg(F.countDistinct("minute").alias("minute_count"))

    air = air.filter(col("region_english").isNotNull())
    air = (air.join(temp, ['start_raid_next', 'region_english'], "left")
               .filter(col("minute_count") == 60*hours*2+1) # need each minute in the two-hour window around the event to appear for at least one device 
          )

    # dummy whether device is observed in minute
    air = air.cache()


    # create dummy columns
    air = air.withColumn("time_to_raid_post", when(col("time_to_raid") >= -600, col("time_to_raid")).otherwise(-1)) # we only want post dummies
    types = air.select(col("time_to_raid_post").cast(StringType())).distinct().rdd.flatMap(lambda x: x).collect()
    types_expr = [F.when(F.col("time_to_raid_post") == ty, 1).otherwise(0).alias("e_TYPE_" + ty) for ty in types]
    air = air.select([col for col in air.columns] + [*types_expr]).drop("e_TYPE_-1")

    return air

    # # calculate average distance from point before alert
    # temp = air.filter(col("time_to_raid") < 0).groupBy("caid", "id").agg(F.avg(col("distance_point")).alias("distance_point_pre"))
    # air = air.join(temp, on = ['caid', 'id'], how = "left")

    # filter only devices that were present in both pre and post period
    #air = air.filter(col("distance_point") != 0) #devices that are pinging from the exact same location all day (since otherwise impossible to be exactly == to average)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
## 30 minute window
air = prepData()

air = (air.withColumn("month", F.month("day")).withColumn("period", 
                                                         when(col("month").between(3,4), 1)
                                                            .otherwise(when(col("month").between(5,6), 2)
                                                            .otherwise(when(col("month").between(7,9), 3))))
      )
temp = air.groupBy("caid").agg(F.countDistinct("period").alias("count"))
air = air.join(temp, on = "caid", how = "left")

runEventStudies("estimates_filtered")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [8]:
## urban vs. rural
pres = air.alias("pres")
air = air.filter(F.col("rural") == 1)
runEventStudies("estimates_filtered_rural")

air = pres.alias("air")
air = air.filter(F.col("rural") == 0)
runEventStudies("estimates_filtered_urban")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [4]:
## 60 minute window
for hr in [1, 4]:
    air = prepData(hours = hr, extend = True)
    air = air.cache()
    air = (air.withColumn("month", F.month("day")).withColumn("period", 
                                                             when(col("month").between(3,4), 1)
                                                                .otherwise(when(col("month").between(5,6), 2)
                                                                .otherwise(when(col("month").between(7,9), 3))))
          )
    temp = air.groupBy("caid").agg(F.countDistinct("period").alias("count"))
    air = air.join(temp, on = "caid", how = "left")

    runEventStudies("estimates_filtered_" + str(60*hr))
    
    air = air.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[caid: string, start_raid_next: timestamp, region_english: string, id: string, day: date, minute: timestamp, kmh_avg: double, distance_avg: double, distance_sum: double, longitude_center: double, latitude_center: double, altitude_avg: double, alert: int, start_raid_min: timestamp, end_raid_max: string, city: string, rural: int, time_to_raid: bigint, latitude_pre: double, longitude_pre: double, distance_point: double, minute_count: bigint, time_to_raid_post: bigint, e_TYPE_2700: int, e_TYPE_4980: int, e_TYPE_10440: int, e_TYPE_3720: int, e_TYPE_3060: int, e_TYPE_14100: int, e_TYPE_12840: int, e_TYPE_5640: int, e_TYPE_4080: int, e_TYPE_12300: int, e_TYPE_10200: int, e_TYPE_12060: int, e_TYPE_4560: int, e_TYPE_9720: int, e_TYPE_300: int, e_TYPE_3600: int, e_TYPE_9060: int, e_TYPE_11820: int, e_TYPE_1860: int, e_TYPE_13920: int, e_TYPE_1200: int, e_TYPE_60: int, e_TYPE_3540: int, e_TYPE_11220: int, e_TYPE_8940: int, e_TYPE_4380: int, e_TYPE_2160: int, e_TYPE_11040: int, e_TYPE_240

#### Two-wall rule

Home locations

In [39]:
air = prepData()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
air = (air.withColumn("month", F.month("day")).withColumn("period", 
                                                         when(col("month").between(3,4), 1)
                                                            .otherwise(when(col("month").between(5,6), 2)
                                                            .otherwise(when(col("month").between(7,9), 3))))
      )
temp = air.groupBy("caid").agg(F.countDistinct("period").alias("count"))
air = air.join(temp, on = "caid", how = "left")

air = air.cache()

# read homes data
homes = spark.read.parquet("s3://ipsos-dvd/ukr/data/homes_2022-01-01_2022-12-31/")

# get closest month (the home dates are 30 days apart so adding 5 days makes sure each date is in a unique month)
homes = homes.withColumn("month", F.month(F.date_add(F.col("start_date"),5)))
homes = homes.withColumnRenamed("longitude", "longitude_home").withColumnRenamed("latitude", "latitude_home")

# join in homes data
air = air.withColumn("month", F.month(col("start_raid_next"))).join(homes, on = ["month", "caid"], how = "left")

# calculate distance from home
air = air.filter(col("longitude_home").isNotNull())
air = air.withColumn("distance_home", 
                     dist_udf(pi * col('latitude_center') / 180, pi * col('longitude_center') / 180, 
                                    pi * col('latitude_home') / 180, pi * col('longitude_home') / 180)
                    )

# get min distance from home by device and alert
temp = air.groupBy("caid", "id").agg(F.min("distance_home").alias("distance_home_min"))
air = air.join(temp, on = ['caid', 'id'], how = "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
# device had to be at least 100 meters away from home at any time during the alert window
air = air.filter(col('distance_home_min') > 100)

runEventStudies("estimates_filtered_away_from_home")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Travel speed

In [43]:
air = prepData()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
# get min speed by device by alert
temp = air.groupBy("caid", "id").agg(F.min("kmh_avg").alias("kmh_avg_min"))
air = air.join(temp, on = ['caid', 'id'], how = "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
air = air.filter(col('kmh_avg_min') > 0.3) #.agg(F.percentile_approx(col('kmh_avg_min'), 0.90)).show()
runEventStudies("estimates_filtered_moving")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Interactions

#### Calculate distance to front

In [9]:
### get distance from front line
@udf(returnType = DoubleType())
def polydistance(point_x, point_y, poly):
    from shapely import wkt
    from shapely import geometry
    polygon = wkt.loads(poly)
    point = geometry.Point(point_x, point_y)
    return polygon.distance(point)
    
# read frontline shapefile
front = spark.read.csv(data_dir + "frontline.csv", header = True)
front = front.dropDuplicates(["date"])
front = front.withColumnRenamed("date", "day")

air = air.join(front, on = "day")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
pres = air.alias("pres")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
temp = air.dropDuplicates(["caid", "id"]).select("longitude_center", "latitude_center", "geometry", "caid", "id")
temp = (temp.withColumn("distance_to_front", polydistance(col("longitude_center"), col("latitude_center"), col("geometry")))
            .select("caid", "distance_to_front", "id"))
air = air.join(temp, on = ["caid", "id"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
air = air.drop("geometry")
air.write.option("header", True).mode("overwrite").csv(data_dir + "distance_to_front")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Run distance-to-front interacted regressions

In [89]:
air = spark.read.csv(data_dir + "distance_to_front", header = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [90]:
w = Window.partitionBy("id")
air = air.cache()
air = air.withColumn("distance_to_front", col("distance_to_front") * 111139 / 1000) # column is in degrees
air = air.withColumn("distance_to_front_log", F.log(1+col("distance_to_front")))
air = air.withColumn("distance_med", (F.col("distance_to_front") >= F.percentile_approx("distance_to_front", 0.5).over(w)).cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
# filter Donetsk and Luhansk
air = air.filter(~ (col("id").contains("Donetsk") | col("id").contains("Luhansk")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [92]:
# areas with less than 50km variation
temp = air.groupBy("id").agg((F.max("distance_to_front") - F.min("distance_to_front")).alias('spread'))
air = air.join(temp, on = "id", how = "left").filter(col("spread") >= 50).drop("spread")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
# areas with minimum number of observations
temp = air.groupby("id").agg(F.count("caid").alias("obs"))
air = air.join(temp, on = "id", how = "left").filter(col("obs") >= 200).drop("obs")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
pres = air.alias("pres")
air = air.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [95]:
air = air.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
# create post dummy
air = air.withColumn("post", (col("time_to_raid") >= 0).cast(IntegerType()))
air = air.withColumn("post_inter", col("time_to_raid") * col("distance_to_front_log"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [97]:
# create dummy interactions
inter_var = "distance_to_front"
x_columns = [col for col in air.columns if "TYPE" in col] # dummies
types_expr = [F.col(xcol) * F.col(inter_var) for xcol in x_columns]
air = air.select([col for col in air.columns] + [*types_expr])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [98]:
x_columns = [col for col in air.columns if "TYPE" in col] + [inter_var] # dummies
#x_columns = ["post", "post_inter", "distance_to_front_log"]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
runEventStudies("estimates_interaction_filtered", x_columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…