# Import data from google big query and store in local

## Imports and global declarations

In [1]:
!pip3 freeze > "../requirements.txt"
#!pip3 install -r "../requirements.txt"  # giving some error

from google.cloud import bigquery
import pandas as pd
import numpy as np
import pandas_gbq
import copy
import copy
import pickle
import datetime as dt
from datetime import timezone
from sklearn.preprocessing import StandardScaler
from sklearn.impute import KNNImputer
from sklearn.cluster import KMeans

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500) 

print('Hello git')


DistributionNotFound: The 'google-cloud-bigquery-storage' distribution was not found and is required by the application

## Custom function

In [2]:
def GBQ_data(query_string):
    client = bigquery.Client('turing-230020')
    query = client.query(query_string)
    results = query.result()
    return results.to_dataframe()


## Download raw data

In [3]:
stack_mcq_mapping_query = """with stack as(
SELECT stack_demand_id as stack_id, stack_demand_name as stack_name, 
count(distinct mcq_id) as num_mcq, 
count(distinct skill_id) as num_skill,
string_agg(distinct cast(mcq_id as string), ',') as mcq_id, 
string_agg(distinct cast(skill_id as string), ',') as skill_id
from `turing-230020.devdb_mirror.dv2_stack_demand_skills` 
left join `turing-230020.devdb_mirror.dv2_skill_mcq` using(skill_id) 
left join `turing-230020.devdb_mirror.dv2_stack_demand` using(stack_demand_id)
where require=1 and mcq_id not in (107,140,142,237) -- and require_for_job=1 
group by 1,2)

SELECT * except(rn) from (SELECT *, row_number() over (partition by mcq_id order by stack_name desc) as rn
from stack ) where rn=1 
"""

skill_mcq_mapping_query = """with skill_mcq_mapping as(
SELECT skill_name as skill_challenge_name, skill_id, mcq_id from(
SELECT * , row_number() over (partition by mcq_id order by length(skill_name), skill_id) as rn from(
SELECT skill_mcq.skill_id, base_skill.skill_name, 
string_agg(cast(skill_mcq.mcq_id as string), ',') mcq_id , 
from `turing-230020.devdb_mirror.dv2_skill_mcq` skill_mcq 
left join `turing-230020.devdb_mirror.base_all_skills_v4` base_skill on skill_mcq.skill_id=base_skill.id
where skill_id in(
select skill_id from `turing-230020.devdb_mirror.dv2_skill_mcq` GROUP by 1 having count(*)>1
) GROUP by 1,2)) where rn=1

UNION all

SELECT challenge_name as skill_challenge_name,skill_id, mcq_id from(
SELECT * , row_number() over (partition by mcq_id order by skill_id) as rn from(
SELECT skill_mcq.skill_id, ch_name.challenge_name, 
string_agg(cast(skill_mcq.mcq_id as string), ',') mcq_id , 
from `turing-230020.devdb_mirror.dv2_skill_mcq` skill_mcq 
left join `turing-230020.devdb_mirror.base_all_skills_v4` base_skill on skill_mcq.skill_id=base_skill.id
left join `turing-230020.devdb_mirror.dv2_challenge` ch_name on skill_mcq.mcq_id=ch_name.challenge_id
where skill_id in(
select skill_id from `turing-230020.devdb_mirror.dv2_skill_mcq` GROUP by 1 having count(*)<2
) GROUP by 1,2)) where rn=1
)

SELECT * from skill_mcq_mapping
"""

skill = GBQ_data(skill_mcq_mapping_query)
stack = GBQ_data(stack_mcq_mapping_query)

skill['skill_challenge_name'] = skill['skill_challenge_name'].str.replace(' ','')
skill.loc[skill['skill_challenge_name']=='Python', 'mcq_id'] = '211'  # 107 is deprecated and catered in ml_mcq
skill.loc[skill['skill_challenge_name']=='Vue.js', 'mcq_id'] = '208'  # 140 is deprecated and catered in ml_mcq
skill.loc[skill['skill_challenge_name']=='DevOps', 'mcq_id'] = '236'  # 142 has same name DevOps
skill.loc[skill['skill_challenge_name']=='AmazonRedshift', 'mcq_id'] = '164,240' # 237 has same name 
skill['mcq_id_set'] = skill['mcq_id'].apply(lambda x : x.split(',',-1)).apply(lambda x : set(map(int,x)))
stack['mcq_id_set'] = stack['mcq_id'].apply(lambda x : x.split(',',-1)).apply(lambda x : set(map(int,x)))

developer_detail_query = """
SELECT country.country_group, dd.*, length(dd.resume_plain) characters_in_reume 
from  devdb_mirror.developer_detail dd left join 
analytics_views.country_information country on dd.country_id=country.country_id
where user_id in (SELECT distinct(dev_id) from `analytics_views.phase2_step1_dev_aggregated`);
"""
developer_detail = GBQ_data(developer_detail_query)
developer_detail.rename(columns = {'user_id':'dev_id'}, inplace=True)
developer_detail['region'] = np.where(developer_detail['country_group']=="Latin and South America", 'LATAM', 'RoW')
print(f" developer_detail Data shape is {developer_detail.shape} and unique developers are {developer_detail['dev_id'].nunique()}")

phase1_data_query = """
SELECT * from `turing-230020.analytics_views.phase1_dev_level_data` 
where dev_id in (SELECT distinct(dev_id) from `analytics_views.phase2_step1_dev_aggregated`)
"""
phase1_data = GBQ_data(phase1_data_query)
print(f" phase1_data Data shape is {phase1_data.shape} and unique developers are {phase1_data['dev_id'].nunique()}")

dev_availability_query = """
SELECT user_id as dev_id, answer as latest_availability ,  action_from as updated_by,
last_update as last_update_availability, notice_period
from `turing-230020.devdb_mirror.dv2_developer_availability`
where user_id in (SELECT distinct(dev_id) from `analytics_views.phase2_step1_dev_aggregated`)
"""
dev_availability = GBQ_data(dev_availability_query)
print(f" dev_availability Data shape is {dev_availability.shape} and unique developers are {dev_availability['dev_id'].nunique()}")

df_phase2_query = """
SELECT * FROM analytics_views.phase2_step1_dev_aggregated
LEFT JOIN analytics_views.phase2_step2_dev_packet_aggre using(dev_id)
LEFT JOIN analytics_views.phase2_step3_dev_interview_aggre using(dev_id)
LEFT JOIN analytics_views.phase2_step4_dev_trials_and_engagements using (dev_id)
where dev_id in (Select distinct dev_id from turing-dev-337819.pdsa.PDAS_P2_cluster where cluster is not null)
;"""

df_phase2 = GBQ_data(df_phase2_query)
df_phase2['count_paying_cust'] = df_phase2['count_paying_cust'].fillna(0)
df_phase2['paying_cust'] = np.where(df_phase2['count_paying_cust'] > 0, 1 , 0)
print(f" df_phase2 Data shape is {df_phase2.shape} and unique developers are {df_phase2['dev_id'].nunique()}")

nts_query = """
SELECT distinct developer_id as dev_id, 1 as NTE_status  
FROM `turing-230020.devdb_mirror.ms2_negotiations`;
"""
nts = GBQ_data(nts_query)
print(f" nts Data shape is {nts.shape} and unique developers are {nts['dev_id'].nunique()}")

acc_lci_query = """
with lci as(
SELECT user_id as lci_user_id, count(*) lci_attempts, max(total_score_by_cases) lci_score 
FROM `turing-230020.devdb_mirror.dv2_challenge_submit` 
where challenge_id=201 and user_id in (SELECT distinct(dev_id) from `analytics_views.phase2_step1_dev_aggregated`)
group by 1),

acc as(
SELECT user_id as acc_user_id, count(*) acc_attempts, max(total_score_by_cases) acc_score
FROM  `turing-230020.devdb_mirror.dv2_challenge_submit` AS dcs
WHERE  challenge_id = 220 and user_id in (SELECT distinct(dev_id) from `analytics_views.phase2_step1_dev_aggregated`)
GROUP by 1
),

acc_lci as(
SELECT lci_user_id, acc_user_id,  COALESCE(lci_user_id, acc_user_id) user_id,
acc_score, lci_score, COALESCE(acc_score, lci_score) as score
from lci full outer join acc on lci.lci_user_id=acc.acc_user_id
)

SELECT user_id as dev_id, acc_score, lci_score, 
COALESCE(acc_score, lci_score) as acc_lci_score 
from acc_lci;
"""
acc_lci = GBQ_data(acc_lci_query)
print(f" acc_lci Data shape is {acc_lci.shape} and unique developers are {acc_lci['dev_id'].nunique()}")

self_dec_skills_query = """
SELECT developer_id as dev_id, count(distinct skill_id) num_skill, 
string_agg(distinct cast(skill_id as string), ',' ) as dev_skill_id,
string_agg(distinct skill_name, ',' ) as dev_skill_name
from(
SELECT dev.*, skill.skill_name from `turing-230020.devdb_mirror.tpm_developer_skill` dev
left join `turing-230020.devdb_mirror.base_all_skills_v4` skill on dev.skill_id = skill.id
where developer_id in (SELECT distinct dev_id from `analytics_views.phase2_step1_dev_aggregated`)
)
GROUP by 1;
"""
self_dec_skills = GBQ_data(self_dec_skills_query)
print(f" self_dec_skills Data shape is {self_dec_skills.shape} and unique developers are {self_dec_skills['dev_id'].nunique()}")

sns_qeury = """
SELECT
  dcs.user_id AS dev_id,
  AVG(dweas.avg_score) AS sn_avg_score
FROM
  devdb_mirror.dv2_challenge_submit AS dcs
  LEFT JOIN devdb_mirror.dv2_work_experience_avg_score AS dweas ON dcs.submit_id = dweas.submit_id
  where user_id in (SELECT distinct(dev_id) from `analytics_views.phase2_step1_dev_aggregated`)
GROUP BY
  dcs.user_id
"""
sns = GBQ_data(sns_qeury)
sns.rename(columns = {'sn_avg_score':'seniority_score'}, inplace=True)
print(f" sns Data shape is {sns.shape} and unique developers are {sns['dev_id'].nunique()}")


ml_mcq_query = """with per as(
select
dms.dev_id, dms.challenge_name, dms.challenge_id, dms.problems_in_challenge, dms.num_attempted, dms.num_correct,
dms.probability_correct, dms.dev_rank, dms.dev_percentile, dms.dev_weight, dms.last_updated_at, dms.from_model, 
dsm.skill_id, dsm.mcq_id, bas4.skill_name, dms.skill_id as dms_skill_id
  from 
    external_query("turing-230020.us.machine-learning",
      "select * from prod.dev_mcq_score"
    ) as dms
  left join devdb_mirror.dv2_skill_mcq as dsm
    on dms.challenge_id = dsm.mcq_id
  left join devdb_mirror.base_all_skills_v4 as bas4
    on dsm.skill_id = bas4.id where dev_id in (SELECT distinct(dev_id) from `analytics_views.phase2_step1_dev_aggregated`)
    ),

per_2 as(
SELECT * except(challenge_id), 
case when challenge_id=142 then 236 when challenge_id=237 then 240 else challenge_id end as challenge_id
from per), 

dev_mcq as (
SELECT * except(rn) from (SELECT *, row_number() over (partition by dev_id, challenge_id order by dev_percentile desc, last_updated_at) as rn
from per_2 ) where rn=1 
)

SELECT dev_id, 
count(distinct challenge_id) as num_challenges,
string_agg(cast(challenge_id as string), ',') as challenge_ids,
count(case when dev_percentile > 50 then challenge_id else null END) as passed_num_challenges ,
string_agg(CASE WHEN dev_percentile > 50 then cast(challenge_id as string) else null END, ',') AS passed_challenge_ids,
string_agg(CASE WHEN dev_percentile > 50 then challenge_name else null END, ',') AS passed_challenge_name,
SUM(problems_in_challenge) as total_problems,
sum(num_attempted) as attempted_problems,
sum(num_correct) as num_correct,
string_agg(cast(dev_percentile as string), ',') as dev_percentile,
avg(dev_percentile) as mean_dev_percentile
from dev_mcq WHERE challenge_id in (SELECT distinct mcq_id from `turing-230020.devdb_mirror.dv2_skill_mcq`) 
GROUP by 1"""  

ml_mcq = GBQ_data(ml_mcq_query)
print(f" ml_mcq Data shape is {ml_mcq.shape} and unique developers are {ml_mcq['dev_id'].nunique()}")
ml_mcq_zero_passed = ml_mcq.loc[ml_mcq.passed_num_challenges ==0 , ]
ml_mcq = ml_mcq.loc[ml_mcq.passed_num_challenges > 0 , ]
ml_mcq['passed_challenge_ids_set'] = ml_mcq['passed_challenge_ids'].apply(lambda x : x.split(',',-1)).apply(lambda x : set(map(int,x)))
ml_mcq['challenge_ids_att_set'] = ml_mcq['challenge_ids'].apply(lambda x : x.split(',',-1)).apply(lambda x : set(map(int,x)))

ml_mcq['num_passed_stack'] = None
ml_mcq['num_attempted_stack'] = None
ml_mcq['num_passed_skill'] = None
ml_mcq['num_attempted_skill'] = None

ml_mcq['passed_skill_id'] = None
ml_mcq['passed_skill_name'] = None
ml_mcq['passed_stack_id'] = None
ml_mcq['passed_stack_name'] = None


for i in ml_mcq.index:
    num_passed_stack = 0
    num_attempted_stack = 0
    num_passed_skill = 0
    num_attempted_skill = 0

    list_id = []
    list_name = []
    stack_id = []
    stack_name = []

    for k in stack.index:
        if stack['mcq_id_set'][k].issubset(ml_mcq['passed_challenge_ids_set'][i]):
            stack_id.append(stack['stack_id'][k])
            stack_name.append(stack['stack_name'][k])
            num_passed_stack = num_passed_stack+1
        if stack['mcq_id_set'][k].issubset(ml_mcq['challenge_ids_att_set'][i]):
            num_attempted_stack = num_attempted_stack+1 

    #ml_mcq['num_passed_stack'][i] = num_passed_stack
    ml_mcq.at[i, 'num_passed_stack'] = num_passed_stack
    ml_mcq.at[i, 'num_attempted_stack'] = num_attempted_stack
    ml_mcq.at[i, 'passed_stack_id'] = stack_id
    ml_mcq.at[i, 'passed_stack_name'] = stack_name

    for m in skill.index:
        if skill['mcq_id_set'][m].issubset(ml_mcq['passed_challenge_ids_set'][i]):
            num_passed_skill = num_passed_skill+1
            list_id.append(skill['skill_id'][m])
            list_name.append(skill['skill_challenge_name'][m])
        if skill['mcq_id_set'][m].issubset(ml_mcq['challenge_ids_att_set'][i]):
            num_attempted_skill = num_attempted_skill+1 

    #ml_mcq['num_passed_skill'][i] = num_passed_skill
    ml_mcq.at[i, 'num_passed_skill'] = num_passed_skill
    ml_mcq.at[i, 'num_attempted_skill'] = num_attempted_skill
    ml_mcq.at[i, 'passed_skill_id'] = list_id
    ml_mcq.at[i, 'passed_skill_name'] = list_name

#print(f"ml_mcq shape {ml_mcq.shape}")

vetted_stack_skill = pd.concat([ml_mcq, ml_mcq_zero_passed])

df_phase2_cols = ['dev_id', 'vetting_status', 'phase2_entry_date', 'phase2_entry_source',
             'count_jobs_suggested', 'count_jobs_packets_sent', 'count_jobs_selected_for_interview',
              'paying_cust','count_paying_cust', 'count_trials', 'count_engagements']

acc_score_cols = ['dev_id', 'acc_score', 'lci_score', 'acc_lci_score']

vetted_stack_skill_cols = ['dev_id', 'num_challenges', 'passed_num_challenges','total_problems','attempted_problems',
                           'num_correct','mean_dev_percentile', 'num_passed_stack', 'passed_stack_id', 'passed_stack_name',
                           'num_passed_skill','passed_skill_id', 'passed_skill_name', 'passed_challenge_ids_set',
                           'challenge_ids_att_set']

dev_detail_data_p1_cols = ['dev_id', 'signup_date', 'years_of_experience', 'words_in_resume',
'resume_upload_date', 'Region', 'source_attribution_type', 'channel', 'english_communication',
'full_name', 'email', 'hourly_rate']

dev_detail_cols = ['dev_id', 'years_of_experience', 'characters_in_reume',
                           'resume_upload_date', 'region', 'country', 'verbal_communication', 'hourly_rate']

p1_data_cols = ['dev_id', 'full_name', 'email' , 'signup_date','source_attribution_type', 'channel', 'user_os', 'user_os_type',
                           'quiz_answer', 'resume_flag', 'english_communication']


dev_avaiability_cols = ['dev_id', 'latest_availability', 'updated_by', 'last_update_availability','notice_period']

seniority_score_cols = ['dev_id', 'seniority_score']

NTE_call_cols = ['dev_id', 'NTE_status']

self_dec_skills_cols = ['dev_id', 'num_skill', 'dev_skill_id', 'dev_skill_name']



global_data = pd.DataFrame()
global_data = df_phase2[df_phase2_cols].merge(acc_lci[acc_score_cols], how='left', on='dev_id')
global_data = global_data.merge(sns, how='left', on='dev_id')
global_data = global_data.merge(developer_detail[dev_detail_cols], how='left', on='dev_id')
global_data = global_data.merge(phase1_data[p1_data_cols], how='left', on='dev_id')
global_data = global_data.merge(dev_availability, how='left', on='dev_id')
global_data = global_data.merge(vetted_stack_skill[vetted_stack_skill_cols], how='left', on='dev_id')
global_data = global_data.merge(nts, how='left', on='dev_id')
global_data = global_data.merge(self_dec_skills[self_dec_skills_cols], how='left', on='dev_id')
print(f" Global Data shape is {global_data.shape} and unique developers are {global_data['dev_id'].nunique()}")

data_skill = global_data.loc[global_data['num_passed_skill']>0,].copy()

data_skill['top_skill_demand'] = np.select(
    [
        data_skill['passed_skill_name'].apply(lambda x:True if 'React' in x else False),
        data_skill['passed_skill_name'].apply(lambda x:True if 'NodeJS' in x else False),
        data_skill['passed_skill_name'].apply(lambda x:True if 'Python' in x else False), 
        data_skill['passed_skill_name'].apply(lambda x:True if 'JavaScript' in x else False)
    ], 
    [
        'React', 
        'NodeJS',
        'Python',
        'JavaScript'
    ], 
    default='Other'
)

data_skill['top_skill_supply'] = np.select(
    [
        data_skill['passed_skill_name'].apply(lambda x:True if 'SQL' in x else False),
        data_skill['passed_skill_name'].apply(lambda x:True if 'Git' in x else False),
        data_skill['passed_skill_name'].apply(lambda x:True if 'Java' in x else False), 
        data_skill['passed_skill_name'].apply(lambda x:True if 'NodeJS' in x else False)
    ], 
    [
        'SQL', 
        'Git',
        'Java',
        'NodeJS'
    ], 
    default='Other'
)

data_stack = global_data.loc[global_data['num_passed_stack']>0,].copy()

data_stack['top_stack_supply'] = np.select(
    [
        data_stack['passed_stack_name'].apply(lambda x:True if 'React Native' in x else False),
        data_stack['passed_stack_name'].apply(lambda x:True if 'Python (Flask/Vue/Angular)' in x else False),
        data_stack['passed_stack_name'].apply(lambda x:True if 'React + Backend' in x else False), 
        data_stack['passed_stack_name'].apply(lambda x:True if 'Android (Kotlin)' in x else False)
    ], 
    [
        'React Native', 
        'Python (Flask/Vue/Angular)/Backend',
        'React + Backend/Backend',
        'Android (Kotlin)'
    ], 
    default='Other'
)

#print(data_skill['top_skill_supply'].value_counts())
#print(data_skill['top_skill_demand'].value_counts())
#print(data_stack['top_stack_supply'].value_counts())

global_data = global_data.merge(data_skill[['dev_id','top_skill_supply','top_skill_demand']], how='left', on='dev_id')
global_data = global_data.merge(data_stack[['dev_id','top_stack_supply']], how='left', on='dev_id')

global_data.loc[global_data['passed_num_challenges']==0, 'num_passed_stack'] = 0
global_data.loc[global_data['passed_num_challenges']==0, 'num_passed_skill'] = 0

global_data.loc[global_data['passed_num_challenges']==0, 'top_skill_supply'] = 'No_skill_passed'
global_data.loc[global_data['passed_num_challenges']==0, 'top_skill_demand'] = 'No_skill_passed'
global_data.loc[global_data['passed_num_challenges']==0, 'top_stack_supply'] = 'No_stack_passed'

global_data.loc[global_data['num_passed_stack']==0, 'top_stack_supply'] = 'No_stack_passed'
global_data.loc[global_data['num_passed_skill']==0, 'top_skill_supply'] = 'No_skill_passed'
global_data.loc[global_data['num_passed_skill']==0, 'top_skill_demand'] = 'No_skill_passed'

global_data.loc[:,'correct_per_challenge'] = global_data.loc[:,'passed_num_challenges']/global_data.loc[:,'num_challenges']
global_data.loc[:,'correct_per_questions'] =  global_data.loc[:,'num_correct']/global_data.loc[:,'total_problems']
global_data['NTE_status'] = global_data['NTE_status'].fillna(0)
print(f" Global Data shape is {global_data.shape} and unique developers are {global_data['dev_id'].nunique()}")


 developer_detail Data shape is (49834, 56) and unique developers are 49834
 phase1_data Data shape is (50021, 66) and unique developers are 50021
 dev_availability Data shape is (49022, 5) and unique developers are 49022
 df_phase2 Data shape is (49680, 45) and unique developers are 49680
 nts Data shape is (48606, 2) and unique developers are 48606
 acc_lci Data shape is (43901, 4) and unique developers are 43901
 self_dec_skills Data shape is (49987, 4) and unique developers are 49987
 sns Data shape is (49278, 2) and unique developers are 49278
 ml_mcq Data shape is (49003, 11) and unique developers are 49003
 Global Data shape is (49680, 54) and unique developers are 49680
 Global Data shape is (49680, 59) and unique developers are 49680


## Store and version raw data

In [4]:
if global_data['dev_id'].duplicated().any():
    print('Global Data has duplicated dev_id')
else:  
    print(f'Global Data of shape {global_data.shape} stored in a csv successfully')
    now = dt.datetime.now()
    current_time = now.strftime("%d-%m-%y") # %H:%M:%S")
    global_data.to_csv('../data/raw/' + '1.0-um-data-prep-all-' + current_time+'.csv')

Global Data of shape (49680, 59) stored in a csv successfully
