In [8]:
import awswrangler as wr
from datetime import datetime
import pandas as pd

In [11]:
raw_s3_bucket = "dfncodetestbucket"
raw_path_dir = "original_data/attribution_data"
raw_path = f"s3://{raw_s3_bucket}/{raw_path_dir}"

standardized_s3_bucket = "dfncodetestbucket" 
standard_path_dir = "converted_data/attribution_data/"
standardized_path = f"s3://{standardized_s3_bucket}/{standard_path_dir}"

In [12]:
header = ['partner', 'campaign', 'server_datetime', 'tracker_id', 'log_id', 'attribution_type', 'identity_adid']

df = wr.s3.read_csv(raw_path, names=header)
df.head()

Unnamed: 0,partner,campaign,server_datetime,tracker_id,log_id,attribution_type,identity_adid
0,,,2018-05-03 07:19:24.813,,bdb8fc95-4f66-4d1d-8186-d10e86fe6433,0,764796223
1,,,2018-05-03 10:25:11.034,,67c41325-a700-4f98-ad72-108025e9af8d,0,2126194985
2,,,2018-05-03 10:26:08.081,,0e41af66-3f17-4bde-91db-806296209ad1,0,738518810
3,,,2018-05-03 22:38:15.378,,a5f7ed1f-5d4e-4adf-96ce-e94f6820c2c2,0,595719449
4,,,2018-05-03 04:14:55.453,,1e1aae33-282d-4dc2-9267-22fbd4ee2798,0,302402748


In [13]:
df = df.astype({"partner" : "str", 
                "campaign" : "str",
                "tracker_id" : "str",
                "log_id" : "str",
                "attribution_type" : "Int64",
                "identity_adid" : "str"
                })

In [14]:
df["server_datetime"] = pd.to_datetime(df["server_datetime"], errors = 'coerce')

In [15]:
df['server_datetime'] = df["server_datetime"].dt.strftime("%Y-%m-%d")

In [16]:
df.head()

Unnamed: 0,partner,campaign,server_datetime,tracker_id,log_id,attribution_type,identity_adid
0,,,2018-05-03,,bdb8fc95-4f66-4d1d-8186-d10e86fe6433,0,764796223
1,,,2018-05-03,,67c41325-a700-4f98-ad72-108025e9af8d,0,2126194985
2,,,2018-05-03,,0e41af66-3f17-4bde-91db-806296209ad1,0,738518810
3,,,2018-05-03,,a5f7ed1f-5d4e-4adf-96ce-e94f6820c2c2,0,595719449
4,,,2018-05-03,,1e1aae33-282d-4dc2-9267-22fbd4ee2798,0,302402748


In [17]:
df.dtypes

partner             object
campaign            object
server_datetime     object
tracker_id          object
log_id              object
attribution_type     Int64
identity_adid       object
dtype: object

In [18]:
partition = ['server_datetime']
wr.s3.to_parquet(df, path=standardized_path, dataset=True, partition_cols=partition)

{'paths': ['s3://dfncodetestbucket/converted_data/attribution_data/server_datetime=2018-05-02/ecb5306fc9f3434985beaea0a1926ba9.snappy.parquet',
  's3://dfncodetestbucket/converted_data/attribution_data/server_datetime=2018-05-03/ecb5306fc9f3434985beaea0a1926ba9.snappy.parquet',
  's3://dfncodetestbucket/converted_data/attribution_data/server_datetime=2018-05-04/ecb5306fc9f3434985beaea0a1926ba9.snappy.parquet',
  's3://dfncodetestbucket/converted_data/attribution_data/server_datetime=2018-05-05/ecb5306fc9f3434985beaea0a1926ba9.snappy.parquet',
  's3://dfncodetestbucket/converted_data/attribution_data/server_datetime=2018-05-06/ecb5306fc9f3434985beaea0a1926ba9.snappy.parquet',
  's3://dfncodetestbucket/converted_data/attribution_data/server_datetime=2018-05-07/ecb5306fc9f3434985beaea0a1926ba9.snappy.parquet',
  's3://dfncodetestbucket/converted_data/attribution_data/server_datetime=2018-05-08/ecb5306fc9f3434985beaea0a1926ba9.snappy.parquet',
  's3://dfncodetestbucket/converted_data/attri

In [23]:
wr.catalog.create_parquet_table(
    database = "dfncodetestdb",
    table = "attribution_data_table",
    path=standardized_path,
    columns_types = {"partner" : "string", 
                    "campaign" : "string",
                    "tracker_id" : "string",
                    "log_id" : "string",
                    "attribution_type" : "int",
                    "identity_adid" : "string"
                    },
    partitions_types = {'server_datetime' : "string"},
    compression = 'snappy',
    description = "test",
    columns_comments = {"partner" : "partner name", 
                        "campaign" : "campaign name",
                        'server_datetime' : "server datetime",
                        "tracker_id" : "tracker ids",
                        "log_id" : "log ids",
                        "attribution_type" : "types of attribution",
                        "identity_adid" : "id"
                        })

In [24]:
wr.athena.repair_table(database = "dfncodetestdb",
                       table = "attribution_data_table",
                       s3_output = standardized_path)

'SUCCEEDED'

In [25]:
raw_s3_bucket = "dfncodetestbucket"
raw_path_dir = "original_data/event_data"
raw_path = f"s3://{raw_s3_bucket}/{raw_path_dir}"

standardized_s3_bucket = "dfncodetestbucket" 
standard_path_dir = "converted_data/event_data/"
standardized_path = f"s3://{standardized_s3_bucket}/{standard_path_dir}"

In [26]:
header = ['identity_adid', 'os', 'model', 'country', 'event_name', 'log_id', 'server_datetime', 'quantity', 'price']

df = wr.s3.read_csv(raw_path, names=header)
df.head()

Unnamed: 0,identity_adid,os,model,country,event_name,log_id,server_datetime,quantity,price
0,984549936,8.9,8.9,jp,abx:login,c21efdb8-b6e5-4ccc-a474-aff72a62c248,2018-05-18 12:23:15.303,,
1,885033552,8.9,8.9,gb,abx:login,b4470f3b-4bb9-43ef-9248-25b503fa5660,2018-05-18 12:32:46.395,,
2,768602461,7.1,7.1,ge,abx:firstopen,372dfecc-a27f-4a16-8e31-eccf34b8855f,2018-05-18 12:34:55.196,,
3,1666798466,3.4,3.4,gb,abx:end_session,08730bdc-2895-4061-8399-f45df94d3fd0,2018-05-18 12:30:23.945,,
4,683694696,7.1,7.1,kr,abx:start_session,a9556df7-f6ee-4600-af5b-89a44f18673c,2018-05-18 12:31:14.824,,


In [27]:
df = df.astype({"identity_adid" : "str", 
                "os" : "str",
                "model" : "str",
                "country" : "str",
                "event_name" : "str",
                "quantity" : "Int64",
                "price" : "float"
                })

In [28]:
df["server_datetime"] = pd.to_datetime(df["server_datetime"], errors = 'coerce')

In [29]:
df['server_datetime'] = df["server_datetime"].dt.strftime("%Y-%m-%d")

In [30]:
df.head()

Unnamed: 0,identity_adid,os,model,country,event_name,log_id,server_datetime,quantity,price
0,984549936,8.9,8.9,jp,abx:login,c21efdb8-b6e5-4ccc-a474-aff72a62c248,2018-05-18,,
1,885033552,8.9,8.9,gb,abx:login,b4470f3b-4bb9-43ef-9248-25b503fa5660,2018-05-18,,
2,768602461,7.1,7.1,ge,abx:firstopen,372dfecc-a27f-4a16-8e31-eccf34b8855f,2018-05-18,,
3,1666798466,3.4,3.4,gb,abx:end_session,08730bdc-2895-4061-8399-f45df94d3fd0,2018-05-18,,
4,683694696,7.1,7.1,kr,abx:start_session,a9556df7-f6ee-4600-af5b-89a44f18673c,2018-05-18,,


In [31]:
df.dtypes

identity_adid       object
os                  object
model               object
country             object
event_name          object
log_id              object
server_datetime     object
quantity             Int64
price              float64
dtype: object

In [32]:
partition = ['server_datetime']
wr.s3.to_parquet(df, path=standardized_path, dataset=True, partition_cols=partition)

{'paths': ['s3://dfncodetestbucket/converted_data/event_data/server_datetime=2018-05-01/701e4b51fe7d48059b5292cb79d66eef.snappy.parquet',
  's3://dfncodetestbucket/converted_data/event_data/server_datetime=2018-05-02/701e4b51fe7d48059b5292cb79d66eef.snappy.parquet',
  's3://dfncodetestbucket/converted_data/event_data/server_datetime=2018-05-03/701e4b51fe7d48059b5292cb79d66eef.snappy.parquet',
  's3://dfncodetestbucket/converted_data/event_data/server_datetime=2018-05-04/701e4b51fe7d48059b5292cb79d66eef.snappy.parquet',
  's3://dfncodetestbucket/converted_data/event_data/server_datetime=2018-05-05/701e4b51fe7d48059b5292cb79d66eef.snappy.parquet',
  's3://dfncodetestbucket/converted_data/event_data/server_datetime=2018-05-06/701e4b51fe7d48059b5292cb79d66eef.snappy.parquet',
  's3://dfncodetestbucket/converted_data/event_data/server_datetime=2018-05-07/701e4b51fe7d48059b5292cb79d66eef.snappy.parquet',
  's3://dfncodetestbucket/converted_data/event_data/server_datetime=2018-05-08/701e4b51f

In [35]:
wr.catalog.create_parquet_table(
    database = "dfncodetestdb",
    table = "event_data_table",
    path=standardized_path,
    columns_types = {"identity_adid" : "string",
                     "os" : "string", 
                    "model" : "string",
                    "country" : "string",
                    "event_name" : "string",
                    "quantity" : "int",
                    "price" : "double"
                    },
    partitions_types = {'server_datetime' : "string"},
    compression = 'snappy',
    description = "test",
    columns_comments = {"identity_adid" :
                        "id",
                         "os" : "os type", 
                        "model" : "model type",
                        "country" : "name of country",
                        "event_name" : "event name",
                        "quantity" : "counts of purchase",
                        "price" : "amount of purchase"
                        })

In [36]:
wr.athena.repair_table(database = "dfncodetestdb",
                       table = "event_data_table",
                       s3_output = standardized_path)

'SUCCEEDED'