Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


#Important
This content are intended for educational and informational purposes only.

In [None]:
#@title Install and import dependencies {display-mode: "form"}
%%capture
!pip install --upgrade google-cloud-storage

from __future__ import division
from __future__ import print_function

import csv
import datetime
import http
import logging
import random

from typing import Dict

from google.cloud import storage
from google.colab import auth

In [None]:
#@title Configure required variables
GCP_PROJECT_ID = "" #@param {type:"string"}
GCS_BUCKET_NAME = "" #@param {type:"string"}
GCS_SOURCE_PREFIX = "" #@param {type:"string"}
GCS_DESTINATION_FOLDER = "" #@param {type:"string"}
GA_TRACKING_ID = "" #@param {type:"string"}

## Map CSV columns to GA Measurement Protocol parameters

### Add all CSV columns with its respective parameter to Measurement Protocol in format:

 ```
variable_mapping = {
 'measurement_protocol_param_name_1': 'csv_column_title_1',
 'measurement_protocol_param_name_2': 'csv_column_title_2',
 .
 .
 .
}
 ```

In [None]:
variable_mapping = {
    'qt': 'order_date',
    'cid': 'user_id',
    'ti': 'order_id',
    'in': 'name',
    'ip': 'price',
    'iq': 'quantity'}

In [None]:
#@title Authenticate to Google Cloud Storage
auth.authenticate_user()

In [None]:
#@title Import conversions
TEMP_FILE_PATH = 'tmp_blob_file'
MAX_QUEUE_TIME_OFFSET = 1000 * 60 * 60 * 4  # 4 hours in milliseconds

_http_client = None
_required_variables = ('qt', 'cid', 'ti', 'in', 'ip', 'iq')

def main():
  validate_mapping()

  storage_client = storage.Client(project=GCP_PROJECT_ID)
  bucket = storage_client.get_bucket(GCS_BUCKET_NAME)
  files = bucket.list_blobs(prefix=GCS_SOURCE_PREFIX)

  transactions = {}

  for file in files:
    for line in read_file(file):
      process_transaction(transactions, line)
      process_item(line)

    move_file(bucket, file)

  send_transaction_hits(transactions)


def validate_mapping():
  for req_var in _required_variables:
    assert(req_var in variable_mapping), \
      ('The variable %s must be mapped' % req_var)


def list_files():
  
  return bucket.list_blobs()


def process_transaction(transactions: Dict[str, Dict], line: Dict[str, str]):

  transaction_id = line[variable_mapping['ti']]
  user_id = line[variable_mapping['cid']]

  if transaction_id not in transactions:
    transactions[transaction_id] = {
        'revenue': 0.0,
        'user_id': user_id,
        'order_date': datetime.datetime.min
    }

  current_transaction_date = datetime.datetime.strptime(
      line[variable_mapping['qt']], '%Y-%m-%d %H:%M:%S.%f')

  if transactions[transaction_id]['order_date'] < current_transaction_date:
    transactions[transaction_id]['order_date'] = current_transaction_date

  transactions[transaction_id]['revenue'] += float(
      line[variable_mapping['ip']])


def process_item(line: Dict[str, str]):
  parameters = format_parameters(line)

  if parameters:
    payload = 'v=1&t=item&tid=%s&%s' % (GA_TRACKING_ID, parameters)
    send_hit(payload)


def read_file(blob: storage.blob.Blob):
  save_temp_blob(blob)
  with open(TEMP_FILE_PATH, 'r') as tmp_file:
    reader = csv.DictReader(tmp_file)
    for row in reader:
      yield row


def save_temp_blob(blob: storage.blob.Blob):
  with open(TEMP_FILE_PATH, 'wb') as tmp_file:
    blob.download_to_file(tmp_file)


def format_parameters(line: Dict[str, str]):
  query_string = ''
  for map_key in variable_mapping:
    assert(variable_mapping[map_key] in line), \
      ('Invalid variable mapping. Missing column %s' %
       variable_mapping[map_key])

    if map_key == 'qt':
      transaction_date = datetime.datetime.strptime(
          line[variable_mapping[map_key]], '%Y-%m-%d %H:%M:%S.%f')
      queue_time = calculate_queue_time(transaction_date)
      if queue_time >= MAX_QUEUE_TIME_OFFSET:
        logging.warning('Transaction date older than 4 hours. Ignoring hit')
        return None

      parameter = 'qt=%s&' % queue_time
    else:
      parameter = '&{param}={value}'.format(
          param=map_key,
          value=line[variable_mapping[map_key]])

    query_string += parameter

  return query_string


def calculate_queue_time(date: datetime):
  milliseconds_diff = (date - datetime.datetime.now()).total_seconds() * 1000
  return int(abs(milliseconds_diff))


def send_transaction_hits(transactions: Dict[str, Dict]):
  for transaction in transactions:
    queue_time = calculate_queue_time(transactions[transaction]['order_date'])
    if queue_time >= MAX_QUEUE_TIME_OFFSET:
      logging.warning('Transaction date older than 4 hours. Ignoring hit')
      continue

    payload = (
        'v=1&t=transaction&tid=%s&ti=%s&cid=%s&tr=%.2f&sc=end' %
        (GA_TRACKING_ID,
         transaction,
         transactions[transaction]['user_id'],
         transactions[transaction]['revenue'])
        )

    send_hit(payload)


def send_hit(payload: str):
  global _http_client

  if not _http_client:
    _http_client = http.client.HTTPSConnection('www.google-analytics.com')

  payload += '&z=%s' % str(random.randrange(100000000000, 999999999999))

  print(payload)

  _http_client.request('POST', '/collect', body=payload, headers={
      'User-Agent': ('Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML'
                     ', like Gecko) Chrome/76.0.3809.132 Safari/537.36')
  })
  response = _http_client.getresponse()
  response_content = response.read()

  if response.code != 200:
    logging.error('An error has occurred during hit post')
  else:
    logging.debug('Hit sent successfully')


def move_file(bucket: storage.bucket.Bucket, file: storage.blob.Blob):
  new_file_name = '%s/%s' % (GCS_DESTINATION_FOLDER, file.name)
  new_file = bucket.rename_blob(file, new_file_name)
  logging.debug('File {} moved to {}'.format(file.name, new_file.name))


main()

v=1&t=item&tid=UA-138985750-1&qt=9517399&&cid=f681cef5-7b4c-469e-a178-0b1442c17b5a&ti=128940&in=Black Vans T-Shirt&ip=9.90&iq=1&z=597129382998
v=1&t=item&tid=UA-138985750-1&qt=9517419&&cid=f681cef5-7b4c-469e-a178-0b1442c17b5a&ti=128940&in=Red Nike Shoes&ip=39.90&iq=1&z=812883007270
v=1&t=item&tid=UA-138985750-1&qt=9517421&&cid=f681cef5-7b4c-469e-a178-0b1442c17b5a&ti=128940&in=Jeans Calvin Klein Pants&ip=59.90&iq=1&z=232147591534
v=1&t=transaction&tid=UA-138985750-1&ti=128940&cid=f681cef5-7b4c-469e-a178-0b1442c17b5a&tr=109.70&sc=end&z=384087395714
