##### About

This is a version of adhoc_exchange_rates.ipynb that does not depend on the datautils submodule. It should only be used in a pinch, as it may not be up to date with the latest version of the functions from datautils.

In [38]:
#import all necessary libraries
import json
from ExchangeRateEvent import *
from get_currencies import *
from constants import API_URL, BASE_CURRENCY, LOCAL_CURRENCIES, SALESFORCE_INSTANCE_URL, SALESFORCE_API_USER, SALESFORCE_API_PASS, SALESFORCE_API_TOKEN, DB_URL, API_STATUS_CODES
import logging
import os
import sys
from datetime import timedelta, date, datetime
import pandas as pd
from file_opener import *
from simple_salesforce import Salesforce, bulk2, format_soql
from dateutil.parser import parse
import urllib3
from urllib3 import util
import requests
import re
import csv
from numpy import float64, int64, dtype, nan


In [39]:
#define custom functions
def file_opener(file, mode = 'r'):
    if file:
        open_file = open(file)
        read_file = open_file.read()
        open_file.close()
    return read_file


    #Salesforce reference of data types and the corresponding pandas dtype
#https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/field_types.htm
DTYPE_MAPPER = {'objecting': 'object'
                ,'double': 'float64'
                ,'boolean': 'bool'
                ,'currency': 'float64'
                ,'textarea': 'object'
                ,'date': 'date'
                ,'datetime': 'datetime'
                ,'id': 'object'
                ,'masterrecord': 'object'
                ,'reference': 'object'
                ,'email': 'object'
                ,'picklist': 'object'
                ,'phone': 'object'
                ,'percent': 'float64'
                ,'location': 'object'
                ,'address': 'object'
                ,'string': 'object'
                ,'url': 'object'}

class sf_connect:
    def __init__(self):
        
        #Create the tuple of environment variables to search for
        env_vars = ('SALESFORCE_INSTANCE_URL'
                    ,'SALESFORCE_API_USER'
                    ,'SALESFORCE_API_PASS'
                    ,'SALESFORCE_API_TOKEN')
        
        #Create an empty dict for the credentials
        credentials = {}
        
        #Loop through the evironment variables and add them to the credential dict if they exist
        #otherwise raise an exception.
        for e in env_vars:
            try:
                credentials.update({e: os.environ[e]})
            except:
                raise Exception('Environment Variable {} does not exist, please set this value.'.format(e))
        
        #Set the credentials of the class
        self.__credentials = credentials
        
        #Connect to Salesforce using the credentials from above
        self.salesforce = Salesforce(instance = self.__credentials.get('SALESFORCE_INSTANCE_URL')
                                    ,username = self.__credentials.get('SALESFORCE_API_USER')
                                    ,password = self.__credentials.get('SALESFORCE_API_PASS')
                                    ,security_token = self.__credentials.get('SALESFORCE_API_TOKEN')
                                    ,session_id = requests.Session())
        
    def get_sf_object(self, query):
        if query:
            self.query = format_soql(query)

            #Create the search expression: the "from" statement, with spaces and at least one of any letter or digit.
            #The purpose is to extract the Salesforce Object
            search_object = re.compile('from\s{1,}\w{1,}(\s{0,}|$)', re.IGNORECASE)
            
            #Create the replacement expression, we want to replace the from and any space characters
            replace = re.compile('(from|\s)', re.IGNORECASE)

            #Search the input query
            result = re.search(search_object, self.query)

            try:
                #Replace the objectings to only get the object.
                self.sf_object = re.sub(replace, '', result[0])
                #Return the salesforce object.
                #return sf_object
            except:
                raise Exception('No salesforce object found in query. The result is empty, please check your query')
            
    def sf_query_object(self, query):

        #Add the salesforce object attribute from the query
        self.get_sf_object(query)
        
        try:
            #Create the connections to the appropriate endpoints
            self.api_object = self.salesforce.__getattr__(self.sf_object)
            self.bulk2_object = self.salesforce.bulk2.__getattr__(self.sf_object)

            #Create a dict of column names and the salesforce data types
            from_dtypes = {c.get('name'): c.get('type') for c in self.api_object.describe().get('fields')}
    
            #Connect to the object via bulk2
            print('Querying data from Salesforce for the {} object...'.format(self.sf_object))
            results = self.bulk2_object.query(self.query)
            print('Query completed.')
        #Otherwise raise any exceptions from the Salesforce class or otherwise
        except Exception as e:
            raise(e)

        print('Parsing query results...')    
        csv_data = [r for r in results]

        df_list = []

        #Iterate through the list of lists
        for c in csv_data:
            #Split each list by the newline characters
            newline_split = c.split('\n')
            #Create a csv reader for all rows except the header, explicitly delimit by a comma
            reader = csv.reader(newline_split[1:], delimiter = ',')
            #Create a csv reader for only the header, explicitly delimit by a comma
            col_reader = csv.reader([newline_split[0]], delimiter = ',')
            #Append a dataframe with the values and columns to the df_list
            df_list.append(pd.DataFrame([row for row in reader if row], columns = [c for c in col_reader]))
        
        if df_list:
            print('Converting results to dataframe...')
            #Concatenate the df_list
            self.data = pd.concat(df_list)
            #In some cases the columns may end up as a multi-index, reset them to just an index
            self.data.columns = self.data.columns.get_level_values(0)
            #Map these data types to the appropriate pandas data types
            self.to_dtypes = {k: DTYPE_MAPPER.get(v) for k,v in from_dtypes.items() if k in self.data.columns}

            #Create a dictionary of the existing dtypes
            dtypes_dict = self.data.dtypes.apply(lambda x: x.name).to_dict()

            #Loop through each column, compare the dtypes and change them if appropriate
            print('Converting data types...')
            for c in self.data.columns:               
                to_dtype = self.to_dtypes.get(c)
                #If the datatypes are not equal follow the specified procedures
                #This currently fails for cross-referenced objects, like referencing Campaign.Name when quering CampaignMembers
                if (to_dtype != dtypes_dict.get(c)) and (to_dtype is not None):
                    print('Converting column {} to {}...'.format(c, to_dtype))   
                    #If the to_dtype is a date then convert the column to a datetime.date
                    if to_dtype == 'date':
                        self.data[c] = self.data.apply(lambda x: datetime.strptime(x[c], '%Y-%m-%d').date(), axis = 1)
                    #Else if the to_dtype is a datetime then convert the column to a datetime.datetime
                    elif to_dtype == 'datetime':
                        self.data[c] = self.data.apply(lambda x: datetime.strptime(x[c], '%Y-%m-%dT%H:%M:%S.%f%z'), axis = 1)
                    #Else if the to_dtype is a boolean then convert the strings to a boolean
                    elif to_dtype == 'bool':
                        self.data[c] = self.data.apply(lambda x: True if x[c] == 'true' else (False if x[c] == 'false' else nan), axis = 1)
                    #Else use the astype method for conversion as it functions the same for the other dtypes
                    else:
                        self.data[c] = self.data[c].astype(dtype(to_dtype))

                else:
                    pass

  search_object = re.compile('from\s{1,}\w{1,}(\s{0,}|$)', re.IGNORECASE)
  replace = re.compile('(from|\s)', re.IGNORECASE)


In [40]:
#set up parameters
API_KEY = os.environ['API_KEY']
file_date = datetime.datetime.now().strftime('%Y%m%d')

AttributeError: type object 'datetime.datetime' has no attribute 'datetime'

In [41]:
#load in SOQL query used to pull all relevant donations
donation_soql = file_opener('opportunity.soql')

In [42]:
#connect to the Salesforce API using the sf_connect.py file in the data_utils submodule
sf = sf_connect()

In [43]:
#run the SOQL query using the API
sf.sf_query_object(donation_soql)

Querying data from Salesforce for the Opportunity object...
Query completed.
Parsing query results...
Converting results to dataframe...
Converting data types...
Converting column Amount to float64...
Converting column CloseDate to date...
Converting column IsClosed to bool...
Converting column IsWon to bool...
Converting column Incorrect_Currency_Conversion__c to bool...
Converting column Local_Currency_Amount_Number_For_Nat__c to float64...


In [44]:
#save results of query as a local dataframe
sf_donations_df = sf.data

In [45]:
#set up a mapping to rename the Salesforce fields locally to match CurrencyAPI field names;
renamer = {'Id': 'opportunity_id'
           ,'Currency__c': 'local_currency'
           ,'Local_Currency_Amount_Number_For_Nat__c': 'local_currency_amount'
           ,'CloseDate': 'exchange_rate_date'}

In [46]:
#apply the field-renaming mapping to the dataframe, and save a list of unique donation dates locally (to help pull the minimum amount of data from the CurrencyAPI)
if len(sf_donations_df) > 0:
    sf_donations_df.rename(columns = renamer, inplace = True)
    currencies_dates = sf_donations_df[['local_currency', 'exchange_rate_date']].copy().drop_duplicates()

In [47]:
#print how many donations will need converting:
print('The Salesforce query returned a dataframe with {} records.'.format(len(sf_donations_df)))

The Salesforce query returned a dataframe with 3164 records.


In [None]:
#get the exchange rate for each donation from the Currency API using the get_currencies.py file

if len(currencies_dates) > 0:
    records = []
    for c in currencies_dates.iterrows():
        data = c[1]
        local_currency = data[0]
        date = data[1].strftime('%Y%m%d')

        exchange_rate = get_currencies(
                                       BASE_CURRENCY
                                       ,local_currency
                                       ,date
                                       ,API_KEY
                                       ,API_URL
                                       )
        records.append(exchange_rate)

In [None]:
#save results
data = [rows.__dict__ for days in records for rows in days]

In [None]:
#format results as a dataframe
if len(data) > 0:
    all_exchange_rates = pd.DataFrame(data)
    all_exchange_rates['exchange_rate_date'] = all_exchange_rates.apply(lambda x: parse(x['exchange_rate_date']).date(), axis = 1)

In [None]:
#save results as a CSV
if len(all_exchange_rates) > 0 and len(sf_donations_df) > 0:
    conversion = sf_donations_df.merge(all_exchange_rates[['local_currency', 'exchange_rate_date', 'exchange_rate']]
                                       ,how = 'left'
                                       ,on = ['local_currency', 'exchange_rate_date']
                                       ,indicator = True)
    
    conversion['base_currency_amount'] = conversion.apply(lambda x: round(x['local_currency_amount']/x['exchange_rate'], 2), axis = 1)
    
    print('{} records were successfuly converted.'.format(len(conversion[conversion['_merge'] == 'both'])))
    
    conversion[conversion['_merge'] == 'both'].to_csv('currency_conversion_{}.csv'.format(file_date))
    
    #conversion[conversion['_merge'] != 'both'].to_csv('no_currency_conversion_{}.csv'.format(file_date))