# OCC daily EDA Transform for time aggregation

## EXPORTADO A SCRIPT

In [1]:
import pandas as pd
import matplotlib.pyplot as plt

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
spark = SparkSession \
    .builder \
    .appName("Learning Spark") \
    .getOrCreate()

date_str = "20231002"

base_dir = f"harvester/occ/{date_str}/"
df = spark.read.json(f"{base_dir}/occ-{date_str}.jsonl.gz")
categories = spark.read.json(f"{base_dir}/occ-{date_str}-categories.json")
subcategories = spark.read.json(f"{base_dir}/occ-{date_str}-subcategories.json")

output_filename = "harvester/occ/occ-timeseries.jsonl"

data = {}
data['date'] = date_str
data['record_version'] = "0.0.1"

In [27]:
def save(filename, data, append=True):
    import json
    
    mode = "w"
    if append:
        mode = "a"
    with open(filename, mode) as fo:
        fo.write(json.dumps(data))

In [4]:
dfu = df.dropDuplicates(["id"])
record_count = dfu.count()
data['total_record_count'] = df.count()
data['deduplicated_record_count'] = dfu.count()
data

{'date': '20231002',
 'record_version': '0.0.1',
 'total_record_count': 154198,
 'deduplicated_record_count': 148725}

In [5]:
data['count_by_redirect_type'] = dfu.groupby("redirect.type").count().toPandas().to_dict('records')
data

{'date': '20231002',
 'record_version': '0.0.1',
 'total_record_count': 154198,
 'deduplicated_record_count': 148725,
 'count_by_redirect_type': [{'type': 0, 'count': 73341},
  {'type': 1, 'count': 3228},
  {'type': 2, 'count': 72156}]}

In [6]:
data["count_by_jobType"] = dfu.groupby("jobType").count().toPandas().to_dict('records')

In [7]:
data["count_by_jobType_redirect_type"] = dfu.groupby("jobType").pivot("redirect.type").count().toPandas().to_dict('records')
data

{'date': '20231002',
 'record_version': '0.0.1',
 'total_record_count': 154198,
 'deduplicated_record_count': 148725,
 'count_by_redirect_type': [{'type': 0, 'count': 73341},
  {'type': 1, 'count': 3228},
  {'type': 2, 'count': 72156}],
 'count_by_jobType': [{'jobType': 'PREMIUM', 'count': 2603},
  {'jobType': 'STANDOUT', 'count': 27490},
  {'jobType': 'CLASSIC', 'count': 118632}],
 'count_by_jobType_redirect_type': [{'jobType': 'PREMIUM',
   '0': 2561,
   '1': 42,
   '2': nan},
  {'jobType': 'STANDOUT', '0': 27055, '1': 339, '2': 96.0},
  {'jobType': 'CLASSIC', '0': 43725, '1': 2847, '2': 72060.0}]}

In [8]:
cat_count = dfu.groupby("category").count().orderBy(F.col("count").desc())
split_col = F.split(cat_count["category.__ref"].cast("String"), ":")
cat_count = cat_count.withColumn("category_id", split_col.getItem(1).cast("INT"))
cat_count = cat_count.join(categories.select("id", "description"), cat_count.category_id == categories.id, how="inner")
cat_count = cat_count.select(["category_id", "description", "count"]).orderBy(F.col("count").desc())

In [9]:
cat_count = cat_count.withColumn("pct", F.col("count") / record_count)
data['count_by_category'] = cat_count.toPandas().to_dict("records")#.show(cat_count.count(), truncate=False)

### Proporciones por tipo de Redir

In [10]:
split_col = F.split(dfu["category.__ref"].cast("String"), ":")
cat_count_redir = dfu.withColumn("category_id", split_col.getItem(1).cast("INT"))
cat_count_redir = cat_count_redir.groupby("category_id").pivot("redirect.type").count()
cat_count_redir = cat_count_redir.join(categories.select("id", "description"), cat_count_redir.category_id == categories.id, how="inner")
cat_count_redir = cat_count_redir.select(["description", "0", "1", "2"]).sort(F.col("0").desc()).toPandas().fillna(0)
cat_count_redir.columns = ["Category", "NoRedir", "Redir1", "Redir2"]
cat_count_redir = cat_count_redir.set_index("Category")
cat_count_redir = cat_count_redir.div(cat_count_redir.sum(axis=1), axis=0)
data['proportions_by_category_by_redirect_type'] = cat_count_redir.to_dict('index')
data['proportions_by_category_by_redirect_type']

{'Ventas': {'NoRedir': 0.45058131106665505,
  'Redir1': 0.020237163317967006,
  'Redir2': 0.5291815256153779},
 'Contabilidad - Finanzas': {'NoRedir': 0.6043264805154955,
  'Redir1': 0.023933722000613684,
  'Redir2': 0.37173979748389074},
 'Tecnologías de la Información - Sistemas': {'NoRedir': 0.46528197077647626,
  'Redir1': 0.032061345248158436,
  'Redir2': 0.5026566839753653},
 'Logística - Transporte - Distribución - Almacén': {'NoRedir': 0.5246695673590017,
  'Redir1': 0.011985879648633117,
  'Redir2': 0.46334455299236516},
 'Administrativo': {'NoRedir': 0.3281299946723495,
  'Redir1': 0.018593500266382526,
  'Redir2': 0.653276505061268},
 'Manufactura - Producción - Operación': {'NoRedir': 0.498684704998121,
  'Redir1': 0.011367906801954153,
  'Redir2': 0.48994738819992484},
 'Ingeniería': {'NoRedir': 0.5788487951055064,
  'Redir1': 0.01810463228867524,
  'Redir2': 0.40304657260581844},
 'Recursos humanos': {'NoRedir': 0.6715921684521611,
  'Redir1': 0.025304765422977465,
  'Red

#### Categorías en las que OCC es débil

Mayor proporción de agregadas

## Subcategorías

In [11]:
subcat_count = dfu.groupby("subcategory").count().orderBy(F.col("count").desc())
split_col = F.split(subcat_count["subcategory.__ref"].cast("String"), ":")
subcat_count = subcat_count.withColumn("subcategory_id", split_col.getItem(1).cast("INT"))
subcat_count = subcat_count.join(subcategories.select("id", "description"), subcat_count.subcategory_id == subcategories.id, how="inner")
data['count_by_subcategory'] = subcat_count.orderBy(F.col("count").desc()).toPandas().to_dict('records')#.show(25, truncate=False)

## Google For Jobs

- Primer campo: si la oferta está disponible para G4J
- Segundo campo: (¿?) si la oferta ya está indexada por G4J

In [12]:
#ignore
dfu.groupby("googleForJobs").count().sort(F.col("count").desc()).toPandas().to_dict('records')#.show(truncate=False)

[{'googleForJobs': Row(__typename='GoogleForJobs', isGoogleForJobs=True, isGoogleIndexed=True),
  'count': 147707},
 {'googleForJobs': Row(__typename='GoogleForJobs', isGoogleForJobs=False, isGoogleIndexed=False),
  'count': 1018}]

## Ubicación

### Estados

#### Distribución de vacantes por estado con suma acumulada

In [13]:
dfu_loc = dfu.select("id", F.explode("location.locations").alias("loc_data"))
state_count = dfu_loc.groupby("loc_data.state.description").count().sort(F.col("count").desc())
#data['count_by_state'] = 
data['count_by_state'] = state_count.toPandas().to_dict('records')

In [14]:
state_count_p = state_count.withColumn("perc", F.col("count") / record_count).orderBy(F.col("perc").desc())
window = Window.orderBy(F.col("perc").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
state_count_p = state_count_p.withColumn("cumsum", F.sum(F.col("perc")).over(window))
state_count_p.show(33)

+-------------------+-----+--------------------+-------------------+
|        description|count|                perc|             cumsum|
+-------------------+-----+--------------------+-------------------+
|   Ciudad de México|42664|    0.28686501933098|   0.28686501933098|
|         Nuevo León|17718| 0.11913262733232476| 0.4059976466633048|
|            Jalisco|13209| 0.08881492687846697|0.49481257354177177|
|   Estado de México|11585| 0.07789544461253992| 0.5727080181543117|
|                   | 9345| 0.06283408976298538| 0.6355421079172971|
|          Querétaro| 6261|0.042097831568330815|  0.677639939485628|
|    Baja California| 4278| 0.02876449823499748| 0.7064044377206254|
|         Guanajuato| 4093|0.027520591696083375| 0.7339250294167088|
|          Chihuahua| 4002| 0.02690872415532022|  0.760833753572029|
|             Puebla| 3828|0.025738779626828037|  0.786572533198857|
|           Veracruz| 3556|0.023909900823667844| 0.8104824340225248|
|           Coahuila| 2911|0.01957

#### Vacantes por estado divididas por tipo Redir, con histograma para Redir2

In [15]:
#ignorar
state_count = dfu.withColumn("loc_data", F.explode("location.locations"))\
                .groupby("loc_data.state.description")\
                .pivot("redirect.type").count()\
                .withColumn("total", F.col("0") + F.col("1") + F.col("2"))\
                .withColumn("NoRedirPct", F.col("0") / F.col("total"))\
                .withColumn("Redir1Pct", F.col("1") / F.col("total"))\
                .withColumn("Redir2Pct", F.col("2") / F.col("total"))\
                .sort(F.col("total").desc())

#### Descripción (granular)

In [16]:
loc_count = dfu.groupby("location.description").count().sort(F.col("count").desc())
loc_count_p = loc_count.withColumn("perc", F.col("count") / record_count).orderBy(F.col("perc").desc())
window = Window.orderBy(F.col("perc").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
loc_count_p = loc_count_p.withColumn("cumsum", F.sum(F.col("perc")).over(window))
print(f"Número de location.description: {loc_count_p.count()}")
data['count_by_granular_location'] = loc_count_p.toPandas().to_dict('records')#show(30, truncate=False)

Número de location.description: 747


## Compañias

Basadas en la url. 

NULL ==> Confidenciales

In [17]:
company_count = dfu.groupby("company.url").count().sort(F.col("count").desc())
print(f"Número de URLs de compañias: {company_count.count()}")
data['count_by_company_url'] = company_count.toPandas().to_dict('records')#.show(25, truncate=False)

Número de URLs de compañias: 14044


#### Proporción de vacantes por cia y suma acumulada

In [18]:
#ignorar
ccp = company_count.withColumn("perc", F.col("count") / record_count).orderBy(F.col("perc").desc())
window = Window.orderBy(F.col("perc").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
ccp.withColumn("cumsum", F.sum(F.col("perc")).over(window)).show(30, truncate=False)

+---------------------------------------------------------+-----+---------------------+-------------------+
|url                                                      |count|perc                 |cumsum             |
+---------------------------------------------------------+-----+---------------------+-------------------+
|null                                                     |15951|0.10725163893091276  |0.10725163893091276|
|empleos/bolsa-de-trabajo-grupo-salinas/                  |11613|0.07808371154815935  |0.18533535047907213|
|empleos/bolsa-de-trabajo-Test-and-QA-Corporation/        |6609 |0.04443772062531518  |0.2297730711043873 |
|empleos/bolsa-de-trabajo-GrabJobs/                       |5001 |0.033625819465456376 |0.26339889056984367|
|empleos/bolsa-de-trabajo-Un-Mejor-Empleo/                |2822 |0.018974617582787023 |0.2823735081526307 |
|empleos/bolsa-de-trabajo-Trabajos-Diarios-MX/            |2213 |0.01487981173306438  |0.29725331988569503|
|empleos/bolsa-de-trabajo-Gr

### Excluyendo confidenciales

In [19]:
#ignorar
ccp_noconf = company_count.where("url is not null")
nconf_records = ccp_noconf.agg(F.sum("count")).collect()[0][0]
ccp_noconf = ccp_noconf.withColumn("perc", F.col("count") / nconf_records).orderBy(F.col("perc").desc())
window = Window.orderBy(F.col("perc").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
ccp_noconf.withColumn("cumsum", F.sum(F.col("perc")).over(window)).show(30, truncate=False)

+---------------------------------------------------------+-----+---------------------+-------------------+
|url                                                      |count|perc                 |cumsum             |
+---------------------------------------------------------+-----+---------------------+-------------------+
|empleos/bolsa-de-trabajo-grupo-salinas/                  |11613|0.08746441321343033  |0.08746441321343033|
|empleos/bolsa-de-trabajo-Test-and-QA-Corporation/        |6609 |0.04977631162727642  |0.13724072484070676|
|empleos/bolsa-de-trabajo-GrabJobs/                       |5001 |0.03766550680103032  |0.17490623164173708|
|empleos/bolsa-de-trabajo-Un-Mejor-Empleo/                |2822 |0.021254161206260263 |0.19616039284799733|
|empleos/bolsa-de-trabajo-Trabajos-Diarios-MX/            |2213 |0.016667419826170787 |0.21282781267416812|
|empleos/bolsa-de-trabajo-Grupo-Salinas/                  |1628 |0.01226143672707006  |0.22508924940123817|
|empleos/bolsa-de-trabajo-au

### Confidenciales

La URL null son confidenciales

In [20]:
data['confidenciales'] = dfu.select(["company.url", "company.name"]).where("company.confidential=TRUE").count()

In [21]:
company_count = dfu.where("redirect.type != 2").groupby("company.url").count().sort(F.col("count").desc())
data['count_by_company_where_not_redirected'] = company_count.toPandas().to_dict('records')#show(truncate=False)

In [22]:
company_count = dfu.where("redirect.type == 2").groupby("company.url").count().sort(F.col("count").desc())
data['count_by_company_where_redirected'] = company_count.toPandas().to_dict('records')#.show(truncate=False)

## Salarios

In [23]:
dfu.select(["salary.from", "salary.to"]).summary().show()

+-------+-----------------+------------------+
|summary|             from|                to|
+-------+-----------------+------------------+
|  count|           148725|            148725|
|   mean|6112.914923516558|  7043.87521264078|
| stddev|11584.42173422566|13231.213591792068|
|    min|                0|                 0|
|    25%|                0|                 0|
|    50%|                0|                 0|
|    75%|            10000|             12000|
|    max|           485000|            500000|
+-------+-----------------+------------------+



Casi todas las Redir2 (agregadas) no tienen salario, vs 1/3 de las pagadas:

In [24]:
dfu.withColumn("has_salary", F.col("salary.from") != 0).groupby("redirect.type").pivot("has_salary").count().show()

+----+-----+-----+
|type|false| true|
+----+-----+-----+
|   0|24507|48834|
|   1| 1845| 1383|
|   2|69131| 3025|
+----+-----+-----+



Al parecer si alguno de los dos campos de salario es mayor a 0, el otro también lo es

In [25]:
salary = dfu.select(["salary.from", "salary.to"]).where("salary.from > 0 or salary.to > 0")
salary = salary.withColumn("avg", (F.col("from") + F.col("to")) / 2)
data['salary_summary'] = salary.summary().toPandas().to_dict('records')#.show()

In [28]:
save(output_filename, data)