# Imports

In [179]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import json
from datetime import date

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,IntegerType, StructType,StringType
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql import functions as sf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import isnan, when, count, col , split

# Functions

In [265]:
def detect_year(begin_value, end_value, dict_years, ci_id):
    ls_year = list(range(begin_value.year, end_value.year+1))
    filter_year = pd.Series(ls_year)[ pd.Series(ls_year).isin(list(range(2014,2019))) ]
    for yr in filter_year:
        if ci_id not in dict_years[yr]:
            dict_years[yr].append(ci_id) 
    return dict_years

def remove_empty_ls(dct):
    if ((pd.Series(dct.values()).str.len()==0).sum()>=1):
        bol = pd.Series(dct.values()).str.len() >=1
        dct_filter = dict( list(zip(pd.Series(dct.keys())[bol], pd.Series(dct.values())[bol])) )
        return dct_filter
    else:
        return dct

def get_dct_by_year(dct,yr_value):
    dct_by_yr = dict()
    for comp_id, dct_years in dct.items():
        try:
            dct_by_yr[comp_id] = dct_years[yr_value]
        except Exception as e:
            pass
    return dct_by_yr

# Load Dataset

In [216]:
df_manager = pd.read_csv('/home/user/Desktop/files_desktop/DATA_managers/adm2018.csv')

In [206]:
df_manager.head(3)

Unnamed: 0,anio,expediente,ingresos,activos,patrimonio,utilidad_perdida,cantidad_epleados,fecha_ejecucion,fecha_actualizacion,adm_cedula,administrador,adm_fnombramiento,adm_ccargo,cargo,adm_periodo,adm_ftermino,adm_frmercantil
0,2018,1,142310.91,1337788.57,1295310.23,10245.38,12.0,2019-06-18 02:32:57.053000,2019-06-18 03:17:01.126000,1701093039,LLERENA ALBUJA TERESA,2008-03-29 15:13:33.886000,1,GERENTE GENERAL,4.0,2012-03-03 16:28:21.526000,2008-04-08 00:00:00
1,2018,10000,39118392.9,27482362.9,7449106.56,1530469.95,1858.0,2019-06-18 02:34:47.153000,2019-06-18 03:17:02.703000,1706450655,MONTALVO PAREDES PABLO SANTIAGO,2014-07-29 08:50:29.166000,32,PRESIDENTE DEL DIRECTORIO,3.0,2016-01-04 10:26:39.920000,2014-08-07 00:00:00
2,2018,100009,1275257.93,488437.09,147892.77,585.75,111.0,2019-06-18 02:46:54.700000,2019-06-18 03:17:17.590000,922066337,DOMINGUEZ ESPINOZA STEPHANIA ELIZABETH,2017-02-15 12:24:44.593000,30,PRESIDENTE,2.0,,2017-02-17 00:00:00


In [217]:
df_manager.isna().sum()

anio                        0
expediente                  0
ingresos                    0
activos                     0
patrimonio                  0
utilidad_perdida            0
cantidad_epleados           0
fecha_ejecucion             0
fecha_actualizacion         0
adm_cedula                  0
administrador             780
adm_fnombramiento           0
adm_ccargo                  0
cargo                       0
adm_periodo              2276
adm_ftermino           117444
adm_frmercantil            25
dtype: int64

# Fix columns format

In [218]:
df_manager["adm_ftermino"].fillna("2019-06-18", inplace=True)
df_manager['date_begin'] = pd.to_datetime((df_manager.adm_fnombramiento.str.split().str[0]), infer_datetime_format=True)
df_manager['date_end'] = pd.to_datetime((df_manager.adm_ftermino.str.split().str[0]), infer_datetime_format=True)
df_manager['expediente'] = df_manager.expediente.astype(str)
df_manager['adm_cedula' ]= df_manager.adm_cedula.str.strip()

# Filter dataset

- Part1: Records between the years 2014 -2018
- Part2: Records of those who started working outside the range of years (2014-2018) and finished their work activities within the range of years analyzed.
- Part3: Records of those who started working between the range of years (2014-2018) and until now they are working (2019).
- Part4: Records of those who started working outside the range of years (2014-2018) and until now they are working (2019).

In [219]:

part1 = df_manager[  ((df_manager.date_end < '2019-01-01') & (df_manager.date_end >= '2014-01-01')) & 
((df_manager.date_begin < '2019-01-01') & (df_manager.date_begin >= '2014-01-01'))]
part2 = df_manager[  ((df_manager.date_end < '2019-01-01') & (df_manager.date_end >= '2014-01-01')) & 
( (df_manager.date_begin < '2014-01-01') )]

part3 = df_manager[  ( (df_manager.date_end == '2019-06-18') ) & 
((df_manager.date_begin < '2019-01-01') & (df_manager.date_begin >= '2014-01-01'))]
part4 = df_manager[  ( (df_manager.date_end == '2019-06-18') ) & 
( (df_manager.date_begin < '2014-01-01') )]

df_manager_final = pd.concat([part1, part2, part3, part4 ], ignore_index=True)

# Load Data of companies

In [16]:
# dk_firms = dd.read_csv('/home/user/Desktop/files_desktop/DATA_managers/BASE.csv')
# dk_firms = dk_firms.rename(columns={'Año':'anio'})
# dk_firms.columns = pd.Series(dk_firms.columns).str.lower()

# # filter columns
# only_firm_by_sector_dk = ((dk_firms[['expediente','sector']]).drop_duplicates(subset=['expediente']))

In [220]:
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "20g") \
    .appName('MyFirstCSVLoad') \
    .getOrCreate()
dk_firms = spark.read.format("csv").load("/home/user/Desktop/files_desktop/DATA_managers/BASE.csv", header = 'true')

In [221]:
only_firm_by_sector_spk = (dk_firms.select(col('expediente'),col('rama_actividad')))\
    .dropDuplicates(['expediente'])

In [222]:
only_firm_by_sector_df = only_firm_by_sector_spk.toPandas()

                                                                                

# Update datasets with the same expediente ids for both dataset (records of administrators and finance data)

In [223]:
df_manager_update = df_manager_final[df_manager_final.expediente.isin(only_firm_by_sector_df.expediente)]

df_rama_exped_id = only_firm_by_sector_df[only_firm_by_sector_df.expediente.isin(df_manager_update.expediente)]\
    .groupby(['rama_actividad'])\
        .apply( lambda x: list(set(x['expediente'])) )\
            .to_frame('expediente_id')\
                .reset_index()

# Form dictionary with *rama_actividad* as keys and a list of firms.
## E.g
```
{'A': ['159855',
  '9064',
  '301751',
  '170521',
  '709746'],
...,
 'T': ['148299', '6693'],
 'U': ['175145'],
 'Z': ['166147', '166095', '159545', '700555']
}
```

In [224]:
dict_rama_exped_id = dict(list(zip(list(df_rama_exped_id.rama_actividad), list(df_rama_exped_id.expediente_id))))
# save dict as json file
with open('/home/user/Desktop/files_desktop/DATA_managers/dict_rama_exped_id.json', 'w',encoding='utf-8') as f:
    json.dump(dict_rama_exped_id, f, ensure_ascii=False, indent=4)

# df_final.expediente.drop_duplicates().isin(only_firm_by_sector_df.expediente).sum() # 60580, 60538
# Opening JSON file
f = open('/home/user/Desktop/files_desktop/DATA_managers/dict_rama_exped_id.json')
# returns JSON object as a dictionary
data = json.load(f)

In [266]:
dict_firm_manager_by_year = dict()

for i,row in df_manager_update.iterrows():
    if row['expediente'] not in dict_firm_manager_by_year.keys():
        anio_dict = { 2014:[],2015:[],2016:[],2017:[],2018:[]}
        dict_results_yr = detect_year(row['date_begin'], row['date_end'], anio_dict, row['adm_cedula']  )
        dict_firm_manager_by_year[row['expediente']] = dict_results_yr
    else:
        anio_dict = dict_firm_manager_by_year[row['expediente']]
        dict_results_yr = detect_year(row['date_begin'], row['date_end'], anio_dict, row['adm_cedula'] )
        dict_firm_manager_by_year[row['expediente']] = dict_results_yr


  filter_year = pd.Series(ls_year)[ pd.Series(ls_year).isin(list(range(2014,2019))) ]


# Remove empty list by year

In [367]:
dict_firm_manager_by_year_update = {firm_id: remove_empty_ls(dct_yr) for firm_id, dct_yr in dict_firm_manager_by_year.items() }

# Form dict of firm by year to estimate common managers

In [382]:
manager_by_firms_2014,manager_by_firms_2015,manager_by_firms_2016,manager_by_firms_2017,manager_by_firms_2018 =  [ get_dct_by_year(dict_firm_manager_by_year_update, i) for i in range(2014,2019)]

In [383]:
manager_by_firms_2014

{'10000': ['1706450655', '1704446804', '1702768381', '1704903986'],
 '10003': ['1706563689', '1702090166', '1705324976'],
 '100138': ['0908881006', '1304323981', '0911221174', '0900427519'],
 '100235': ['0907171615', '0908911407', '0900919226'],
 '100253': ['0909779761', '1801667757'],
 '100330': ['0915765655', '1200903829'],
 '100390': ['0915932222', '0924310527', '0904268794'],
 '100538': ['1715246045', '1715271274'],
 '100740': ['0300802436', '0101248193'],
 '100812': ['0920317088', '0921991576'],
 '10098': ['1711874667', '1002223236'],
 '101400': ['0908027949', '0908956386', '0904582392'],
 '101601': ['0910457563', '0907663579'],
 '101612': ['1300263017', '0904010162', '0903159796'],
 '10212': ['1701983866', '1707347777', '1702385988'],
 '102266': ['1710078138', '0908244791'],
 '102355': ['0906290184', '0909808040'],
 '102366': ['0917568834', '0903447092'],
 '102697': ['0908890825',
  '0908890478',
  '0903299139',
  '0990794901001',
  '0902259936'],
 '103100': ['0911822203', '09150