In [1]:
config_types_mapping_filename = "/config/types_mapping.json"
data_filename_import = "/data/input/users/load.csv"
data_filename_parquet_product_export = "/data/output/product_export_spark.parquet"
data_filename_csv_product_export = "/data/output/product_export_spark.csv"

In [2]:
column_sort_values = "update_date"
column_drop_duplicates = "id"
time_format = "yyyy-MM-dd' 'HH:mm:ss.SSSSSS"

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext

spark = SparkSession.builder.appName("ImportExportCSVToParquet").master("spark://spark:7077").getOrCreate()
sc = spark.sparkContext
sqlCtx = SQLContext(sc)

In [4]:
# from pyspark.sql.types import *
# schema = StructType([
#    StructField("id", IntegerType(), True),
#    StructField("name", StringType(), True),
#    StructField("email", StringType(), True),
#    StructField("phone", StringType(), True),
#    StructField("address", StringType(), True),
#    StructField("age", IntegerType(), True),
#    StructField("create_date", StringType(), True),
#    StructField("update_date", StringType(), True)])

In [5]:
import pandas as pd

df_row_data = pd.read_csv(data_filename_import)
# sdf_row_data = sqlCtx.createDataFrame(df_row_data,schema)
sdf_row_data = sqlCtx.createDataFrame(df_row_data)
sdf_row_data.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('email', 'string'),
 ('phone', 'string'),
 ('address', 'string'),
 ('age', 'bigint'),
 ('create_date', 'string'),
 ('update_date', 'string')]

In [6]:
from pyspark.sql.functions import to_timestamp, desc

sdf_conveted_column_to_sort_data = sdf_row_data.withColumn(column_sort_values, to_timestamp(sdf_row_data[column_sort_values], time_format))
sdf_sorted_data = sdf_conveted_column_to_sort_data.orderBy(desc(column_sort_values))

In [7]:
sdf_deduplicate_data = sdf_sorted_data.drop_duplicates([column_drop_duplicates])

In [8]:
sdf_deduplicate_data.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+



In [9]:
# carregamento metodo para mapeamento de tipos
from load_mapping_types import load_mapping_types

types_mapping_dict = load_mapping_types(config_types_mapping_filename, sdf_deduplicate_data.dtypes, False)
print(types_mapping_dict)

{'id': 'bigint', 'name': 'string', 'email': 'string', 'phone': 'string', 'address': 'string', 'age': 'integer', 'create_date': 'timestamp', 'update_date': 'timestamp'}


In [10]:
from pyspark.sql.functions import col

def convert_type_data_frame(df):
    return df.select(*(col(k).cast(types_mapping_dict[k]).alias(k) for k in types_mapping_dict.keys()))

df_converted_type_data = sdf_deduplicate_data.transform(convert_type_data_frame)

In [11]:
df_converted_type_data.dtypes 

[('id', 'bigint'),
 ('name', 'string'),
 ('email', 'string'),
 ('phone', 'string'),
 ('address', 'string'),
 ('age', 'int'),
 ('create_date', 'timestamp'),
 ('update_date', 'timestamp')]

In [12]:
df_converted_type_data.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+



In [13]:
# Removido devido ao crop do timestamp
# df_converted_type_data.toPandas().to_csv(data_filename_csv_product_export,index=False)
# df_converted_type_data.toPandas().to_parquet(data_filename_parquet_product_export,index=False)

In [14]:
spark.stop()