In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=ab79d14d60ad0e8f45b086dd0834ffca13aaa3fdc233fc86bef8c406f5b409a6
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType

In [None]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/service-key-gcloud.json'

In [None]:
spark = SparkSession.builder \
    .appName('spark-run-with-gcp-bucket') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar, https://storage.googleapis.com/spark-lib/bigquery/spark-3.3-bigquery-0.32.0.jar") \
    .config("spark.sql.legacy.timeParserPolicy", "CORRECTED") \
    .getOrCreate()


In [None]:
bucket_name = 'cnpj-data'
folder_name = 'extracted'

In [None]:
socios_schema = StructType([
    StructField("cnpj_basico", StringType(), True),
    StructField("cod_identificador_socio", IntegerType(), True),
    StructField("nome_socio", StringType(), True),
    StructField("cpf_cnpj_socio", StringType(), True),
    StructField("cod_qualificacao_socio", IntegerType(), True),
    StructField("data_entrada_sociedade", StringType(), True),
    StructField("cod_pais", IntegerType(), True),
    StructField("representante_legal", StringType(), True),
    StructField("nome_representante", StringType(), True),
    StructField("cod_qualificacao_representante", IntegerType(), True),
    StructField("cod_faixa_etaria", IntegerType(), True),
])

socios_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/*.SOCIOCSV', schema = socios_schema, sep = ';')

socios_df = socios_df.withColumn('data_entrada_sociedade', f.to_date(socios_df["data_entrada_sociedade"], "yyyyMMdd"))

socios_df.show()

+-----------+-----------------------+--------------------+--------------+----------------------+----------------------+--------+-------------------+------------------+------------------------------+----------------+
|cnpj_basico|cod_identificador_socio|          nome_socio|cpf_cnpj_socio|cod_qualificacao_socio|data_entrada_sociedade|cod_pais|representante_legal|nome_representante|cod_qualificacao_representante|cod_faixa_etaria|
+-----------+-----------------------+--------------------+--------------+----------------------+----------------------+--------+-------------------+------------------+------------------------------+----------------+
|   46204351|                      2|RUTHIELLI DOS SAN...|   ***627370**|                    49|            2022-04-29|    null|        ***000000**|              null|                             0|               3|
|   46204403|                      2|LUIZ FELIPE FRANC...|   ***446578**|                    49|            2022-04-29|    null|        

In [None]:
socios_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.socios") \
  .mode("overwrite") \
  .save()

In [None]:
empresas_schema = StructType([
    StructField("cnpj_basico", StringType(), True),
    StructField("razao_social", StringType(), True),
    StructField("cod_natureza_juridica", IntegerType(), True),
    StructField("cod_qualificacao_responsavel", IntegerType(), True),
    StructField("capital_social", StringType(), True),
    StructField("cod_porte_empresa", IntegerType(), True),
    StructField("ente_federativo", StringType(), True)
])

empresas_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/*.EMPRECSV', schema = empresas_schema, sep = ';')

empresas_df = empresas_df.withColumn("capital_social", f.regexp_replace(f.col("capital_social"), ",", ".").cast(IntegerType()))

empresas_df.show()


+-----------+--------------------+---------------------+----------------------------+--------------+-----------------+---------------+
|cnpj_basico|        razao_social|cod_natureza_juridica|cod_qualificacao_responsavel|capital_social|cod_porte_empresa|ente_federativo|
+-----------+--------------------+---------------------+----------------------------+--------------+-----------------+---------------+
|   41273597|PACHARRUS QUEIROZ...|                 2135|                          50|          5000|                1|           null|
|   41273598|GLORIA VIANA DIAS...|                 2135|                          50|          1100|                1|           null|
|   41273599|ANA PAULA DA SILV...|                 2135|                          50|          2000|                1|           null|
|   41273600|41.273.600 AVANIL...|                 2135|                          50|         50000|                1|           null|
|   41273601|GABRIELA HELENA F...|                 2135

In [None]:
empresas_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.empresas") \
  .mode("overwrite") \
  .save()

In [None]:
estabelecimento_schema = StructType([
    StructField("cnpj_basico", StringType(), True),
    StructField("cnpj_ordem", StringType(), True),
    StructField("cnpj_digito_verificador", StringType(), True),
    StructField("cod_identificador_matriz", IntegerType(), True),
    StructField("nome_fantasia", StringType(), True),
    StructField("cod_situacao_cadastral", IntegerType(), True),
    StructField("data_situacao_cadastral", StringType(), True),
    StructField("cod_motivo", IntegerType(), True),
    StructField("nome_cidade_exterior", StringType(), True),
    StructField("cod_pais", IntegerType(), True),
    StructField("data_inicio", StringType(), True),
    StructField("cod_cnae_principal", StringType(), True),
    StructField("cod_cnae_secundario", StringType(), True),
    StructField("tipo_logradouro", StringType(), True),
    StructField("logradouro", StringType(), True),
    StructField("numero", StringType(), True),
    StructField("complemento", StringType(), True),
    StructField("bairro", StringType(), True),
    StructField("cep", StringType(), True),
    StructField("uf", StringType(), True),
    StructField("cod_municipio", IntegerType(), True),
    StructField("ddd_1", StringType(), True),
    StructField("telefone_1", StringType(), True),
    StructField("ddd_2", StringType(), True),
    StructField("telefone_2", StringType(), True),
    StructField("ddd_fax", StringType(), True),
    StructField("fax", StringType(), True),
    StructField("correio_eletronico", StringType(), True),
    StructField("situacao_especial", StringType(), True),
    StructField("data_situacao_especial", StringType(), True)
])

estabelecimento_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/*.ESTABELE', schema = estabelecimento_schema, sep = ';')

estabelecimento_df = estabelecimento_df.withColumn('data_situacao_cadastral', f.to_date(estabelecimento_df["data_situacao_cadastral"], "yyyyMMdd"))
estabelecimento_df = estabelecimento_df.withColumn('data_inicio', f.to_date(estabelecimento_df["data_inicio"], "yyyyMMdd"))
estabelecimento_df = estabelecimento_df.withColumn('data_situacao_especial', f.to_date(estabelecimento_df["data_situacao_especial"], "yyyyMMdd"))

estabelecimento_df.show()

+-----------+----------+-----------------------+------------------------+--------------------+----------------------+-----------------------+----------+--------------------+--------+-----------+--------------+--------------------+---------------+--------------------+------+--------------------+--------------------+--------+---+-------------+-----+----------+-----+----------+-------+--------+--------------------+-----------------+----------------------+
|cnpj_basico|cnpj_ordem|cnpj_digito_verificador|cod_identificador_matriz|       nome_fantasia|cod_situacao_cadastral|data_situacao_cadastral|cod_motivo|nome_cidade_exterior|cod_pais|data_inicio|cnae_principal|     cnae_secundario|tipo_logradouro|          logradouro|numero|         complemento|              bairro|     cep| uf|cod_municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_fax|     fax|  correio_eletronico|situacao_especial|data_situacao_especial|
+-----------+----------+-----------------------+------------------------+-------------

In [None]:
estabelecimento_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.estabelecimento") \
  .mode("overwrite") \
  .save()

In [None]:
paises_schema = StructType([
    StructField("cod_pais", IntegerType(), True),
    StructField("pais", StringType(), True)
])

paises_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/F.K03200$Z.D30610.PAISCSV',schema = paises_schema, sep = ';')

paises_df.show()

+--------+--------------------+
|cod_pais|                pais|
+--------+--------------------+
|       0|       COLIS POSTAUX|
|      13|         AFEGANISTAO|
|      17|             ALBANIA|
|      20|ALBORAN-PEREJIL,I...|
|      23|            ALEMANHA|
|      25|ALEMANHA, REP. DE...|
|      31|        BURKINA FASO|
|      37|             ANDORRA|
|      40|              ANGOLA|
|      41|            ANGUILLA|
|      43|   ANTIGUA E BARBUDA|
|      47| ANTILHAS HOLANDESAS|
|      53|      ARABIA SAUDITA|
|      59|             ARGELIA|
|      63|           ARGENTINA|
|      64|ARMENIA, REPUBLIC...|
|      65|               ARUBA|
|      69|           AUSTRALIA|
|      72|             AUSTRIA|
|      73|AZERBAIJAO, REPUB...|
+--------+--------------------+
only showing top 20 rows



In [None]:
paises_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.paises") \
  .mode("overwrite") \
  .save()

In [None]:
municipios_schema = StructType([
    StructField("cod_municipio", IntegerType(), True),
    StructField("municipio", StringType(), True)
])

municipios_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/F.K03200$Z.D30610.MUNICCSV',schema = municipios_schema, sep = ';')

municipios_df.show()

+-------------+--------------------+
|cod_municipio|           municipio|
+-------------+--------------------+
|            1|       GUAJARA-MIRIM|
|            2|ALTO ALEGRE DOS P...|
|            3|         PORTO VELHO|
|            4|             BURITIS|
|            5|           JI-PARANA|
|            6|         CHUPINGUAIA|
|            7|           ARIQUEMES|
|            8|             CUJUBIM|
|            9|              CACOAL|
|           10|          NOVA UNIAO|
|           11|       PIMENTA BUENO|
|           12|             PARECIS|
|           13|             VILHENA|
|           14|PIMENTEIRAS DO OESTE|
|           15|                JARU|
|           16|PRIMAVERA DE ROND...|
|           17| OURO PRETO DO OESTE|
|           18|  SAO FELIPE D'OESTE|
|           19|   PRESIDENTE MEDICI|
|           20|SAO FRANCISCO DO ...|
+-------------+--------------------+
only showing top 20 rows



In [None]:
municipios_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.municipios") \
  .mode("overwrite") \
  .save()

In [None]:
qualificacao_schema = StructType([
    StructField("cod_qualificacao", IntegerType(), True),
    StructField("qualificacao", StringType(), True)
])

qualificacao_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/F.K03200$Z.D30610.QUALSCSV', schema = qualificacao_schema, sep = ';',encoding="Windows-1252")

qualificacao_df.show()

+----------------+--------------------+
|cod_qualificacao|        qualificacao|
+----------------+--------------------+
|               0|       Não informada|
|               5|       Administrador|
|               8|Conselheiro de Ad...|
|               9|             Curador|
|              10|             Diretor|
|              11|         Interventor|
|              12|       Inventariante|
|              13|          Liquidante|
|              14|                 Mãe|
|              15|                 Pai|
|              16|          Presidente|
|              17|          Procurador|
|              18|          Secretário|
|              19|Síndico (Condomínio)|
|              20|Sociedade Consorc...|
|              21|   Sociedade Filiada|
|              22|               Sócio|
|              23|   Sócio Capitalista|
|              24|   Sócio Comanditado|
|              25|  Sócio Comanditário|
+----------------+--------------------+
only showing top 20 rows



In [None]:
qualificacao_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.qualificacao") \
  .mode("overwrite") \
  .save()

In [None]:
natureza_schema = StructType([
    StructField("cod_natureza_juridica", IntegerType(), True),
    StructField("natureza_juridica", StringType(), True)
])

natureza_juridica_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/F.K03200$Z.D30610.NATJUCSV', schema = natureza_schema, sep = ';', encoding="Windows-1252")

natureza_juridica_df.show()

+---------------------+--------------------+
|cod_natureza_juridica|   natureza_juridica|
+---------------------+--------------------+
|                    0|Natureza Jurídica...|
|                 3271|Órgão de Direção ...|
|                 3280|Comitê Financeiro...|
|                 3298|Frente Plebiscitá...|
|                 3301|Organização Socia...|
|                 3999|  Associação Privada|
|                 4014|Empresa Individua...|
|                 4090|Candidato a Cargo...|
|                 4120|Produtor Rural (P...|
|                 5010|Organização Inter...|
|                 5029|Representação Dip...|
|                 1015|Órgão Público do ...|
|                 1023|Órgão Público do ...|
|                 1031|Órgão Público do ...|
|                 1040|Órgão Público do ...|
|                 1058|Órgão Público do ...|
|                 1066|Órgão Público do ...|
|                 1074|Órgão Público do ...|
|                 1082|Órgão Público do ...|
|         

In [None]:
natureza_juridica_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.natureza_juridica") \
  .mode("overwrite") \
  .save()

In [None]:
cnae_schema = StructType([
    StructField("cod_cnae", StringType(), True),
    StructField("atividade", StringType(), True)
])

cnae_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/F.K03200$Z.D30610.CNAECSV', schema = cnae_schema, sep = ';', encoding="Windows-1252")

cnae_df.show()

+--------+--------------------+
|cod_cnae|           atividade|
+--------+--------------------+
|  111301|    Cultivo de arroz|
|  111302|    Cultivo de milho|
|  111303|    Cultivo de trigo|
|  111399|Cultivo de outros...|
|  112101|Cultivo de algodã...|
|  112102|     Cultivo de juta|
|  112199|Cultivo de outras...|
|  113000|Cultivo de cana-d...|
|  114800|     Cultivo de fumo|
|  115600|     Cultivo de soja|
|  116401| Cultivo de amendoim|
|  116402| Cultivo de girassol|
|  116403|   Cultivo de mamona|
|  116499|Cultivo de outras...|
|  119901|  Cultivo de abacaxi|
|  119902|     Cultivo de alho|
|  119903|Cultivo de batata...|
|  119904|   Cultivo de cebola|
|  119905|   Cultivo de feijão|
|  119906| Cultivo de mandioca|
+--------+--------------------+
only showing top 20 rows



In [None]:
cnae_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.cnae") \
  .mode("overwrite") \
  .save()

In [None]:
motivos_schema = StructType([
    StructField("cod_motivo", IntegerType(), True),
    StructField("motivo", StringType(), True)
])

motivos_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/F.K03200$Z.D30610.MOTICSV',  schema = motivos_schema, sep = ';', encoding="Windows-1252")

motivos_df.show()

+----------+--------------------+
|cod_motivo|              motivo|
+----------+--------------------+
|         0|          SEM MOTIVO|
|         1|EXTINCAO POR ENCE...|
|         2|        INCORPORACAO|
|         3|               FUSAO|
|         4|         CISAO TOTAL|
|         5|ENCERRAMENTO DA F...|
|         6|ENCERRAMENTO DA L...|
|         7|   ELEVACAO A MATRIZ|
|         8|          TRANSPASSE|
|         9|NAO INICIO DE ATI...|
|        10|EXTINCAO PELO ENC...|
|        11|ANULACAO POR MULT...|
|        12|ANULACAO ONLINE D...|
|        13|     OMISSA CONTUMAZ|
|        14|OMISSA NAO LOCALI...|
|        15|INEXISTENCIA DE FATO|
|        16| ANULACAO POR VICIOS|
|        17|BAIXA INICIADA EM...|
|        18|INTERRUPCAO TEMPO...|
|        21|PEDIDO DE BAIXA I...|
+----------+--------------------+
only showing top 20 rows



In [None]:
motivos_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.motivos") \
  .mode("overwrite") \
  .save()

In [None]:
porte_empresa =[
    (0,'Não informado'),
    (1,'Micro empresa'),
    (3,'Pequeno porte'),
    (5,'Demais')
]

porte_schema = StructType([
    StructField("cod_porte_empresa", IntegerType(), True),
    StructField("porte_empresa", StringType(), True)
])

porte_empresa_df = spark.createDataFrame(porte_empresa, schema = porte_schema)

porte_empresa_df.show()
porte_empresa_df.printSchema()

+-----------------+-------------+
|cod_porte_empresa|porte_empresa|
+-----------------+-------------+
|                0|Não informado|
|                1|Micro empresa|
|                3|Pequeno porte|
|                5|       Demais|
+-----------------+-------------+

root
 |-- cod_porte_empresa: integer (nullable = true)
 |-- porte_empresa: string (nullable = true)



In [None]:
porte_empresa_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.porte_empresa") \
  .mode("overwrite") \
  .save()

In [None]:
identificador_matriz =[
    (1,'Matriz'),
    (2,'Filial')
]

identificador_matriz_schema = StructType([
    StructField("cod_identificador_matriz", IntegerType(), True),
    StructField("matriz_filial", StringType(), True)
])

identificador_matriz_df = spark.createDataFrame(identificador_matriz, schema = identificador_matriz_schema)

identificador_matriz_df.show()

+------------------------+-------------+
|cod_identificador_matriz|matriz_filial|
+------------------------+-------------+
|                       1|       Matriz|
|                       2|       Filial|
+------------------------+-------------+



In [None]:
identificador_matriz_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.identificador_matriz") \
  .mode("overwrite") \
  .save()

In [None]:
situacao_cadastral =[
    (1,'Nula'),
    (2,'Ativa'),
    (3,'Suspensa'),
    (4,'Inapta'),
    (8,'Baixada'),
]

situacao_schema = StructType([
    StructField("cod_situacao_cadastral", IntegerType(), True),
    StructField("situacao_cadastral", StringType(), True)
])

situacao_cadastral_df = spark.createDataFrame(situacao_cadastral, schema = situacao_schema)

situacao_cadastral_df.show()

+----------------------+------------------+
|cod_situacao_cadastral|situacao_cadastral|
+----------------------+------------------+
|                     1|              Nula|
|                     2|             Ativa|
|                     3|          Suspensa|
|                     4|            Inapta|
|                     8|           Baixada|
+----------------------+------------------+



In [None]:
situacao_cadastral_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.situacao_cadastral") \
  .mode("overwrite") \
  .save()

In [None]:
identificador_socio =[
    (1,'Pessoa Juridica'),
    (2,'Pessoa Fisica'),
    (3,'Estrangeiro')
]

socio_schema = StructType([
    StructField("cod_identificador_socio", IntegerType(), True),
    StructField("identificador_socio", StringType(), True)
])

identificador_socio_df = spark.createDataFrame(identificador_socio, schema = socio_schema)

identificador_socio_df.show()

+-----------------------+-------------------+
|cod_identificador_socio|identificador_socio|
+-----------------------+-------------------+
|                      1|    Pessoa Juridica|
|                      2|      Pessoa Fisica|
|                      3|        Estrangeiro|
+-----------------------+-------------------+



In [None]:
identificador_socio_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.identificador_socio") \
  .mode("overwrite") \
  .save()

In [None]:
faixa_etaria =[
    (1,'0 a 12'),
    (2,'13 a 20'),
    (3,'21 a 30'),
    (4,'31 a 40'),
    (5,'41 a 50'),
    (6,'51 a 60'),
    (7,'61 a 70'),
    (8,'71 a 80'),
    (9,'+80'),
    (0,'Não se aplica'),
]

faixa_etaria_schema = StructType([
    StructField("cod_faixa_etaria", IntegerType(), True),
    StructField("faixa_etaria", StringType(), True)
])
faixa_etaria_df = spark.createDataFrame(faixa_etaria, schema = faixa_etaria_schema)

faixa_etaria_df.show()

+----------------+-------------+
|cod_faixa_etaria| faixa_etaria|
+----------------+-------------+
|               1|       0 a 12|
|               2|      13 a 20|
|               3|      21 a 30|
|               4|      31 a 40|
|               5|      41 a 50|
|               6|      51 a 60|
|               7|      61 a 70|
|               8|      71 a 80|
|               9|          +80|
|               0|Não se aplica|
+----------------+-------------+



In [None]:
faixa_etaria_df.write \
  .format("bigquery") \
  .option("writeMethod","direct") \
  .option("table", "light-truth-391719.processed_cpnj_data.faixa_etaria") \
  .mode("overwrite") \
  .save()

In [None]:
simples_schema = StructType([
    StructField("cnpj_basico", StringType(), True),
    StructField("opcao_simples", StringType(), True),
    StructField("data_opcao_simples", StringType(), True),
    StructField("data_exclusao_simples", StringType(), True),
    StructField("opcao_mei", StringType(), True),
    StructField("data_opcao_mei", StringType(), True),
    StructField("data_exclusao_mei", StringType(), True)
])

simples_df = spark.read.csv(f'gs://{bucket_name}/{folder_name}/F.K03200$W.SIMPLES.CSV.D30610', schema = simples_schema, sep = ';')

simples_df = simples_df.withColumn('data_opcao_simples', f.to_date(simples_df["data_opcao_simples"], "yyyyMMdd"))
simples_df = simples_df.withColumn('data_exclusao_simples', f.to_date(simples_df["data_exclusao_simples"], "yyyyMMdd"))
simples_df = simples_df.withColumn('data_opcao_mei', f.to_date(simples_df["data_opcao_mei"], "yyyyMMdd"))
simples_df = simples_df.withColumn('data_exclusao_mei', f.to_date(simples_df["data_exclusao_mei"], "yyyyMMdd"))

simples_df.show()

+-----------+-------------+------------------+---------------------+---------+--------------+-----------------+
|cnpj_basico|opcao_simples|data_opcao_simples|data_exclusao_simples|opcao_mei|data_opcao_mei|data_exclusao_mei|
+-----------+-------------+------------------+---------------------+---------+--------------+-----------------+
|   00000000|            N|        2007-07-01|           2007-07-01|        N|    2009-07-01|       2009-07-01|
|   00000006|            N|        2018-01-01|           2019-12-31|        N|          null|             null|
|   00000008|            N|        2014-01-01|           2021-12-31|        N|          null|             null|
|   00000011|            S|        2007-07-01|                 null|        N|          null|             null|
|   00000013|            S|        2009-01-01|                 null|        N|          null|             null|
|   00000015|            N|        2007-07-01|           2008-12-31|        N|          null|           

In [None]:
batch_size = 10000

batched_df = simples_df.repartition(batch_size)

batched_df.write \
    .format("bigquery") \
    .option("writeMethod", "direct") \
    .option("table", "light-truth-391719.processed_cpnj_data.simples") \
    .mode("append") \
    .save()