In [103]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.functions import udf

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ShortType, ByteType, DateType

In [32]:
CREDENTIALS_LOCATION = '/home/emilel/.secrets/gcp/gcp-secret.json'
SPARK_LIB = "/home/emilel/dez-project-emi/lib/gcs-connector-hadoop3-2.2.5.jar"

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", SPARK_LIB) \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", CREDENTIALS_LOCATION)

In [33]:
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")
hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", CREDENTIALS_LOCATION)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [34]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [35]:
idm_schema = StructType([
    StructField('year', ShortType(), True)
    , StructField('entity_code', ByteType(), True)
    , StructField('entity_name', StringType(), True)
    , StructField('municipality_code', ShortType(), True)
    , StructField('municipality_name', StringType(), True)
    , StructField('affected_legal_asset', StringType(), True)
    , StructField('crime_type', StringType(), True)
    , StructField('crime_subtype', StringType(), True)
    , StructField('crime_modality_type', StringType(), True)
    , StructField('january', IntegerType(), True)
    , StructField('february', IntegerType(), True)
    , StructField('march', IntegerType(), True)
    , StructField('april', IntegerType(), True)
    , StructField('may', IntegerType(), True)
    , StructField('june', IntegerType(), True)
    , StructField('july', IntegerType(), True)
    , StructField('august', IntegerType(), True)
    , StructField('september', IntegerType(), True)
    , StructField('october', IntegerType(), True)
    , StructField('november', IntegerType(), True)
    , StructField('december', IntegerType(), True)
])

In [81]:
unpivoting_columns = ['year'
                      , 'entity_code'
                      , 'entity_name'
                      , 'municipality_code'
                      , 'municipality_name'
                      , 'affected_legal_asset'
                      , 'crime_type'
                      , 'crime_subtype'
                      , 'crime_modality_type']


stack_query_expression = '''
stack(12
, "january", january
, "february", february
, "march", march
, "april", april
, "may", may
, "june", june
, "july", july
, "august", august
, "september", september
, "october", october
, "november", november
, "december", december
) as (month, crimes)
'''

month_dict = {
    'january'     : '01-01'
    , 'february'  : '02-01'
    , 'march'     : '03-01'
    , 'april'     : '04-01'
    , 'may'       : '05-01'
    , 'june'      : '06-01'
    , 'july'      : '07-01'
    , 'august'    : '08-01'
    , 'september' : '09-01'
    , 'october'   : '10-01'
    , 'november'  : '11-01'
    , 'december'  : '12-01'
}

In [95]:
@udf(returnType=StringType())
def get_first_day_of_month_date(year, month_name):
    return str(year) + '-' + month_dict[month_name]

In [96]:
idm_df = spark.read.option('header', True).schema(idm_schema).csv('gs://landing_bucket_dez/idm.csv')

In [97]:
unpivoted_df = idm_df.selectExpr(*unpivoting_columns
                                 , stack_query_expression)

In [102]:
date_df = unpivoted_df.withColumn('info_month_date', get_first_day_of_month_date_udf('year', 'month').cast(DateType()))

In [104]:
date_df.write.partitionBy('year').parquet('gs://landing_bucket_dez/pq/idm/')

24/03/16 23:58:57 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: A�o, Clave_Ent, Entidad, Cve. Municipio, Municipio, Bien jur�dico afectado, Tipo de delito, Subtipo de delito, Modalidad, Enero, Febrero, Marzo, Abril, Mayo, Junio, Julio, Agosto, Septiembre, Octubre, Noviembre, Diciembre
 Schema: year, entity_code, entity_name, municipality_code, municipality_name, affected_legal_asset, crime_type, crime_subtype, crime_modality_type, january, february, march, april, may, june, july, august, september, october, november, december
Expected: year but found: A�o
CSV file: gs://landing_bucket_dez/idm.csv


[Stage 39:>                                                         (0 + 8) / 8]

24/03/16 23:59:37 ERROR Executor: Exception in task 3.0 in stage 39.0 (TID 66)
java.lang.OutOfMemoryError: Java heap space
24/03/16 23:59:37 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 3.0 in stage 39.0 (TID 66),5,main]
java.lang.OutOfMemoryError: Java heap space
24/03/16 23:59:37 WARN TaskSetManager: Lost task 3.0 in stage 39.0 (TID 66) (dez-emil-vm.us-west4-b.c.dez-workspace-emil.internal executor driver): java.lang.OutOfMemoryError: Java heap space

24/03/16 23:59:37 ERROR TaskSetManager: Task 3 in stage 39.0 failed 1 times; aborting job
24/03/16 23:59:37 ERROR FileFormatWriter: Aborting job 48053e8c-1213-494c-bdf1-0e9f75121aed.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 39.0 failed 1 times, most recent failure: Lost task 3.0 in stage 39.0 (TID 66) (dez-emil-vm.us-west4-b.c.dez-workspace-emil.internal executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktra

ERROR:root:Exception while sending command.                         (0 + 7) / 8]
Traceback (most recent call last):
  File "/home/emilel/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emilel/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/emilel/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o724.parquet

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/emilel/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emilel/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/emilel/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
