In [1]:
# import packages
import os
import dataclasses
from dataclasses import dataclass
import pandas as pd
import numpy as np
from datetime import date
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import itertools
import read_data as rd


In [2]:
# set environment folders
data_path = "../data"

In [5]:
# read the number of group numbers in all the files
file_list = os.listdir("{dir_path}/US_EDUC".format(dir_path=data_path))

row_group_list = []

for file in file_list:
    edus = pq.ParquetFile("{data_path}/US_EDUC/{file}".format(data_path=data_path, file = file))
    row_group_list.append(edus.num_row_groups)

print(row_group_list)

[81, 82, 81, 82, 12, 15, 60, 8, 80, 15, 15, 83, 15, 11, 81, 15, 15, 15, 80, 15, 15, 11]


In [None]:

os.listdir("{data_path}/US_EDUC".format(data_path=data_path))

In [23]:
# read user id dataset
ids = pq.read_table("{data_path}/unique_user_id_US_EDUC.parquet".format(data_path=data_path))
# sort user id, later would useful in filtering
ids = ids.sort_by("user_id")
min_id = ids.column("user_id")[0].as_py()
max_id = ids.column("user_id")[-1].as_py()
ids.column("user_id").index(697508942)

<pyarrow.Int64Scalar: 23843300>

In [5]:
# check min and max id
print("min id is,", min_id)
print("max id is,", max_id)
print(ids.column("user_id")[100000])

min id is, 1000013
max id is, 2096705167
3655629


In [4]:
# read education
edus = pq.ParquetFile("{data_path}/US_EDUC/US_EDUC_user_education.parquet".format(data_path=data_path))
# display edus metadata
edus.num_row_groups

60

In [5]:
# try reading one rowgroup and check memory usage
group0 = edus.read_row_group(0).to_pandas()

group0 = group0[(group0["user_id"] >= min_id) & (group0["user_id"] <= max_id)]
print(group0.shape)
# num_rows = group0.num_rows
# print(group0.loc[0,:])

print(group0["user_id"])

print(pd.unique(group0["user_id"]))



(1048576, 14)
0          130452445
1          130452445
2          617479683
3          617479683
4          617479683
             ...    
1048571    353784875
1048572    540751562
1048573    540751562
1048574    540751562
1048575    540751562
Name: user_id, Length: 1048576, dtype: int32
[130452445 617479683 203520822 ... 279513580 353784875 540751562]


In [51]:
# get the number of people with insane education history

off_df = group0[group0["education_number"] > 6]
len(pd.unique(off_df["user_id"]))

2682

In [50]:
# print(group0.head())
# drop the outlier
# group0 = group0[group0["user_id"] != 697508942]
# get the maximum education number
# max(group0["education_number"])
# print(group0[group0["education_number"] == 8])
# print(group0.loc[359753])
print(group0[group0["user_id"] == 355326980])

        user_id                                     university_raw  \
1182  355326980                       Western Governors University   
1183  355326980           Syracuse University - University College   
1184  355326980                        Jones County Junior College   
1185  355326980                         City University of Seattle   
1186  355326980  Institute for Veterans and Military Families -...   
1187  355326980                             The Sourcing Institute   
1188  355326980             The University of Southern Mississippi   
1189  355326980                       Kapi‘olani Community College   
1190  355326980                         City University of Seattle   

                                        university_name  education_number  \
1182                              University of Phoenix                 4   
1183             syracuse university university college                 3   
1184                        jones county junior college             

In [None]:
# check time of constructing and updating one edu instance
edu_test = rd.edu()
edu_test.update_value(group0.loc[0])
print(edu_test)

In [None]:
# test value access in pandas.df
df = pd.DataFrame(np.arange(12).reshape(3, 4),

                  columns=['A', 'B', 'C', 'D'])

print(df)

df[0:2]


In [None]:
# check group0 properties
# group0.slice(length=10).sort_by([("user_id","ascending"),("enddate","ascending")])
# group0.filter(group0.field("user_id") == 323)
group0.field("user_id")
expr = pc.field("user_id") < 323
sub0 = group0.filter(mask = expr, null_selection_behavior = "drop")
sub0.num_rows

In [None]:
print(group0.column("user_id").index(111))
print(group0.column("user_id").index(130452445))

In [None]:
id_expr = pc.field("user_id") == 130452445
ids.filter(id_expr)

In [None]:
# test df construction time

numrows = ids.num_rows
dt = {"user_id": ids.column("user_id"), "user_prof": pd.Series([rd.user()] * numrows), "skill": pd.Series([rd.skill()] * numrows), "edu1": pd.Series([rd.edu()] * numrows), "edu2": pd.Series([rd.edu()] * numrows), "edu3": pd.Series([rd.edu()] * numrows), "edu4": pd.Series([rd.edu()] * numrows), "pos1": pd.Series([rd.pos()] * numrows), "pos2": pd.Series([rd.pos()] * numrows), "pos3": pd.Series([rd.pos()] * numrows), "pos4": pd.Series([rd.pos()] * numrows) }
df = pd.DataFrame(dt)
# tb = pa.Table.from_pydict(dt)

In [None]:
# print(df.head())
print(df.dtypes)

In [25]:
# check user dataset
prof = pq.ParquetFile("{data_path}/US_EDUC/US_EDUC_user_part_1_0_249.parquet".format(data_path=data_path))
prof0 = prof.read_row_group(5).to_pandas()
prof0[prof0["user_id"] == 697508942]





Unnamed: 0,user_id,firstname,lastname,fullname,f_prob,m_prob,white_prob,black_prob,api_prob,hispanic_prob,...,highest_degree,sex_predicted,ethnicity_predicted,profile_linkedin_url,user_location,user_country,profile_title,updated_dt,numconnections,profile_summary


In [None]:
# test time of one iteration before df assignment
expr = pc.field("user_id") == 600272782
thisuser = prof0.filter(mask = expr, null_selection_behavior = "drop")

test_prof = rd.user()
test_prof.update_value(thisuser.take([0]).to_pandas())



In [None]:
# test df assignment time
id_index = ids.column("user_id").index(3655629).as_py()
print(id_index)
print(df.loc[id_index, "user_prof"])


In [None]:
df.loc[id_index, "user_prof"].update_value(thisuser.take([0]).to_pandas())
print(df.loc[id_index, "user_prof"])

In [None]:
# test time of just df assignment
df.loc[id_index, "user_prof"] = test_prof

In [None]:
prof0.slice(length=5).column("user_id")

In [5]:
# check user_position dataset
pos = pq.ParquetFile("{data_path}/US_EDUC/US_EDUC_user_position_part_1_00_0_249.parquet".format(data_path=data_path))
pos0 = pos.read_row_group(0)
pos0.schema
pos0.num_rows

1048576

In [None]:
print(pos0.slice(length=1).column(0)[0])
print(pos0.column_names[0])
print(pos0.column("user_id")[0])

In [6]:
# pos0.select(["title_raw", "role_k1500", "job_category", "role_k50", "role_k150"])
for (col_name, col_type) in itertools.zip_longest(pos0.schema.names, pos0.schema.types):
    print(col_name,",", col_type)

user_id , int64
position_id , int64
company_raw , string
company_linkedin_url , string
company_cleaned , string
location_raw , string
region , string
country , string
state , string
metro_area , string
startdate , string
enddate , string
title_raw , string
role_k1500 , string
job_category , string
role_k50 , string
role_k150 , string
role_k300 , string
role_k500 , string
role_k1000 , string
remote_suitability , float
weight , float
description , string
start_salary , double
end_salary , double
seniority , int16
salary , float
position_number , int16
rcid , double
company_name , string
ultimate_parent_rcid , double
ultimate_parent_company_name , string
onet_code , string
onet_title , string
ticker , string
exchange , string
cusip , string
naics_code , string
naics_description , string
ultimate_parent_factset_id , string
ultimate_parent_factset_name , string
total_compensation , float
additional_compensation , float
title_translated , string


In [None]:
# check user_skill dataset
skill = pq.ParquetFile("{data_path}/US_EDUC/US_EDUC_user_skill_part_0000_0049.parquet".format(data_path=data_path))
skill0 = skill.read_row_group(0)
skill0.num_columns
skill0.column("user_id")[0]

In [None]:
skill0.slice(length=3)

In [9]:
# test mutating df
df1 = pd.DataFrame({"a":[1,2,3], "b":[2,3,4], "c":pd.Series([pd.NA] * 3), "d":[7,7,8], "e": ["drop", "may_drop", "no_drop"]}, )

print(df1)

# df1=df1[~df1["e"].isin(["drop", "may_drop"])]
sub_data = df1[0:3]

print(sub_data[sub_data["d"] == 7].shape[0])

# print(df1)

# df2 = pd.DataFrame({"a":[2,2,8], "b":[2,3,5], "c":pd.Series([pd.NA] * 3), "d":[7,7,8]})
# print(df1)
# df1.iloc[2, 0:-1] = df2.iloc[2, 0:-1]
# print(df1)
# df2.loc[2, "a"] = 100 
# print(df1)
# print(df2)
# print(df1.iloc[0,0:-1])

   a  b     c  d         e
0  1  2  <NA>  7      drop
1  2  3  <NA>  7  may_drop
2  3  4  <NA>  8   no_drop
2


In [None]:
for i in range(3,-1,-1):
    print(i)

In [57]:
user_var= ("user_id",
    "firstname",
    "lastname",
    "fullname",
    "f_prob", 
    "m_prob", 
    "white_prob", 
    "black_prob", 
    "api_prob", 
    "hispanic_prob", 
    "native_prob", 
    "multiple_prob", 
    "prestige", 
    "highest_degree", 
    "sex_predicted", 
    "ethnicity_predicted", 
    "profile_linkedin_url", 
    "user_location", 
    "user_country", 
    "profile_title", 
    "updated_dt", 
    "numconnections", 
    "profile_summary",
    "updated" )


user_default= (0, # "user_id",
               "none", # "firstname",
                "none", # "lastname",
                "none", # "fullname",
                0.0, # "f_prob", 
                0.0, # "m_prob", 
                0.0, # "white_prob", 
                0.0, # "black_prob", 
                0.0, # "api_prob", 
                0.0, # "hispanic_prob", 
                0.0, # "native_prob", 
                0.0, # "multiple_prob", 
                0.0, # "prestige", 
                "none", # "highest_degree", 
                "none", # "sex_predicted", 
                "none", # "ethnicity_predicted", 
                "none", # "profile_linkedin_url", 
                "none", # "user_location", 
                "none", # "user_country", 
                "none", # "profile_title", 
                "none", # "updated_dt", 
                0, # "numconnections", 
                "none", # "profile_summary",
                False)# "updated" )

dt = {field_name: [field_default] * 3 for (field_name, field_default) in itertools.zip_longest(user_var, user_default)}
dt["user_id"] = ids.slice(length=3).column("user_id")
df = pd.DataFrame(dt)

In [58]:
df.dtypes

user_id                   int32
firstname                object
lastname                 object
fullname                 object
f_prob                  float64
m_prob                  float64
white_prob              float64
black_prob              float64
api_prob                float64
hispanic_prob           float64
native_prob             float64
multiple_prob           float64
prestige                float64
highest_degree           object
sex_predicted            object
ethnicity_predicted      object
profile_linkedin_url     object
user_location            object
user_country             object
profile_title            object
updated_dt               object
numconnections            int64
profile_summary          object
updated                    bool
dtype: object