# Data wrangling of digital assets
### Data Engineering Capstone Project

#### Project Summary
The project leveraging financial and social data related to the crypto currency industry. The idea is to prepare the data to e.g. explore the relationship between price trends of cryptocurrency assets (e.g. Bitcoin) and its sentiment from social media platforms. 
Hereby, structured financial market data (e.g. Bitcoin price over a year) as well as unstructured social media data (e.g. Bitcoin tweets on twitter) will be extracted from the source platforms (e.g. cryptocurrency exchange or twitter). Then the data will be transformed (e.g. remove duplicates, denormailze tables). Finally, the prepared data will be loaded to the target system (e.g. filesystem)

Hereby, following steps are executed as part of the ETL process:
* 1. Extract and transform data from cryptocurrecny exchange
    * 1.1 Extract tradingdata
    * 1.2 Transform trading data to marketdata
* 2. Extract and transform social data from Twitter
    * 2.1 Extract tweets from twitter and persist original tweets on filesystem
    * 2.2 Extract orginal tweets from filesystem
    * 2.3 Transform original tweets
* 3. Run quality checks on tweets and marketdata
* 4. Persist tweets and marketdata on filesystem
    

#### Libraries
At this step necessary libraries being imported. Note, to be able to import Twitter API package the TwitterAPI wrapper need to be installed first running "pip install TwitterApi" command in the bash console.  

In [1]:
## Import dependencies

%load_ext sql

import pandas as pd
import requests
import datetime
import time
import json
import csv

from urllib.parse import urljoin
from functools import reduce

from pyspark.sql.types import *
from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.sql.functions import year, month, dayofweek, hour, weekofyear, date_format, \
                                  udf, col, lit, struct, isnan, count, when

from TwitterAPI import TwitterAPI

#### Spark environment
As the application is running locally on jupyter notebook the Spark Session has to  be instantiated manually. In case the application is executed in an environment leveraging a Spark kernel (e.g. Amazon EMR) leveraging this step is not needed.

In [2]:
## Make spark environment available.

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .appName("ETL_CryptocurrencyInsight") \
    .getOrCreate()

### 1. Extract and transform data from cryptocurrecny exchange
In this step the data extraction (1.1) and transformation (1.2) of the trading data is executed. As part of the data extraction the trading data for Bitcoin and Ethereum are fetched from the cryptocurrency exchange. Please note that the trading details refer to the "daily close" containing aggregated data of one day of trading. 
#### 1.1 Extract tradingdata 
To extract the tradingdata the public API from the cryptocurrency exchange "Binance" was taken. The cryptocurrency exchange binance is one of the most popular exchanges with the highest volume in terms of trading volumne. Furthermore, it provides a public api allowing to execute GET request to receive trading details. Therefore, the binance API is selected as datasource. The Get Request fetches data about the daily close (closing price, highest price etc.) of Bitcoin (BTCUSDT) and Ethereum (ETHUSDT).

#### 1.2 Transform tradingdata to marketdata
After the tradingdata have been extracted succesfully the tradingdata are transformed to marketdata. As part of the transformation the data are cleaned by removing nan and null values. Furthermore, duplicates will be removed. Finally the original tradingdata are splitted into three dataframes denormalizing time (e.g. datetime), trade (e.g. volume) and symbol (e.g. BTCUSD) related data. Based on the closing time a unqiue id is being created. The id describes the relation between the spark dataframes. 

In [3]:
def extract_tradingdata(symbol):
    """
    Description: Collect URL and all parameters. The interval is set to 1d fetching the daily close.
                 No specific headers needed as it is a public API.
                 Note: To avoid IP ban the number of API calls muste be < 100 calls/minute.
    Parameters:
        symbol: Describes the traidingpair (cryptocurrency asset) for which trading data being requested.
    Returns:
        response: Contains the response from the API call to the cryptocurrency exchange.
    """
    
    baseurl = "https://api.binance.com"
    path = "/api/v3/klines"
    headers = None
    params = {
        'symbol': symbol,
        'interval': '1d'
    }
    url = urljoin(baseurl, path)
    
    # Requesting data from cryptocurrency exchange for specific symbol
    response = requests.get(url, headers=headers, params=params)
    
    print("Success: Call to crypto currency exchange executed")
    return response
    
def get_tradingdata(symbol):
    """
    Description: The function is calling the API of an cryptocurrency exchange getting the trading details for
                 for a specific cryptocurrency asset. Afterwards it transforms the trading data.
    Parameters:
        symbol: Describes the traidingpair (cryptocurrency asset) for which trading data being requested.
    Returns:
        df_marketdata: Spark dataframe containing the trading data which are matched with the corresponding symbol.
    """
    
    # Execute the trading data extraction from datasource
    response = extract_tradingdata(symbol)

    # If the response is successful and data are being fetched prepare dataframe containing marketdata
    if response.status_code == 200:

        # Extract trading details and make them available to the application
        # Define columns for dataframe based on API description from cryptocurrency exchange. 
        # Source: https://github.com/binance-exchange/binance-official-api-docs/blob/master/rest-api.md#general-api-information
        content_tradingdata = response.json()   
        columns_tradingdata = ['open_time', 
                              'open', 
                              'high', 
                              'low', 
                              'close', 
                              'volume', 
                              'close_time', 
                              'quote_asset_volume', 
                              'number_of_trades', 
                              'taker_buy_base_asset_volume', 
                              'taker_buy_quote_asset_volume', 
                              'unclassfied']
        df_tradingdata = spark.createDataFrame(content_tradingdata, columns_tradingdata)

        # Overwritting trading dataframe by adjusting datatypes and adding symbol column
        # ensuring the trading details are assigned to the correct cryptocurrency
        df_tradingdata = df_tradingdata.selectExpr("cast(open_time as long) open_time", 
                                                  "cast(open as decimal(20,10)) open", 
                                                  "cast(high as decimal(20,10)) high", 
                                                  "cast(low as decimal(20,10)) low", 
                                                  "cast(close as decimal(20,10)) close",
                                                  "cast(volume as decimal(20,10)) volume",
                                                  "cast(close_time as long) close_time",
                                                  "cast(quote_asset_volume as decimal(20,10)) quote_asset_volume",
                                                  "cast(number_of_trades as integer) number_of_trades",
                                                  "cast(taker_buy_base_asset_volume as decimal(20,10)) taker_buy_base_asset_volume",
                                                  "cast(taker_buy_quote_asset_volume as decimal(20,10)) taker_buy_quote_asset_volume") \
                                                  .withColumn("symbol", lit(symbol))
        
        print("Success: Trading data were extracted")
        return df_tradingdata

    else:
        print("Crypto Currency API not available. Please try later")  

def get_marketdata(trading_pair_symbols):       
    """
    Description: Creates a dataframe containing trading data for all requested cryptocurrency assets (marketdata).
    Parameters: 
        trading_pair_symbols: List containing cryptocurrency assets symbols.
    Returns: 
        None
    """
    
    # Merges dataframes containing trading details for each cryptocurrency into one dataframe containing all data (marketdata)
    series_tradingdata = []
    for trading_pair_symbol in trading_pair_symbols:
        series_tradingdata.append(get_tradingdata(trading_pair_symbol))
    
    # Execute DAG and combine collection of trading data dataframes 
    return reduce(DataFrame.unionAll, series_tradingdata)

def transform_marketdata(df_marketdata):       
    """
    Description: Process marketdata and split it into three tables following star schema approach.
                 The fact table (df_marketdata_trade) created contains trading metrics. The time table 
                 (df_marketdata_time) is descriptive and contains related trading times. The symbol 
                 table (df_marketdata_symbol) contains symbols (trading pairs). The relation between the
                 dataframes is described by the closing time (id).             
    Parameters: 
        df_marketdata: Contains all marketdata as result from data extraction
    Returns: 
        None
    """
    # @udf: Extract datetime based on the timestamp (in ms)
    get_datetime_udf = udf(lambda ts: datetime.datetime.fromtimestamp(ts / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
    
    # Convert timestamp to datetime and rename closing time to id
    # Note: The closing time acts as id as it is unique and allows to combine different marketdata dataframes 
    df_marketdata = df_marketdata.withColumn("datetime", get_datetime_udf(df_marketdata.close_time)) \
                                 .withColumnRenamed("close_time","id")
    
    # Ensure data quality dropping null values and duplicates in id. 
    df_marketdata.na.drop(subset=["id"]).dropDuplicates(["id"])
    
    # Create time table dataframe for marketdata. The id is expressed by the closing time. 
    df_marketdata_time = df_marketdata.select(["id",
                                               month("datetime").alias("month"),
                                               year("datetime").alias("year"),
                                               dayofweek("datetime").alias("weekday"),
                                               weekofyear("datetime").alias("weekofyear"),
                                               "datetime"])
    
    # Create dataframe trade table containing all revelant details.
    df_marketdata_trade = df_marketdata.select("id",
                                               "open",
                                               "close",
                                               "high",
                                               "low",
                                               "quote_asset_volume",
                                               "number_of_trades",
                                               "taker_buy_base_asset_volume",
                                               "taker_buy_quote_asset_volume")
    
    # Create dataframe trade table containing all revelant details.
    df_marketdata_symbol = df_marketdata.select("id",
                                                "symbol")
    
    print("Success: Marketdata were transformed")
    return df_marketdata_time, df_marketdata_trade, df_marketdata_symbol

## Receive all marketdata for cryptocurrency assets
trading_pair_symbols = ["BTCUSDT", "ETHUSDT", "XRPUSDT"]
df_marketdata = get_marketdata(trading_pair_symbols)

## Split the marketdata into different tables following star schema
df_marketdata_time, df_marketdata_trade, df_marketdata_symbol = transform_marketdata(df_marketdata)

Success: Call to crypto currency exchange executed
Success: Trading data were extracted
Success: Call to crypto currency exchange executed
Success: Trading data were extracted
Success: Call to crypto currency exchange executed
Success: Trading data were extracted
Success: Marketdata were transformed


### 2. Extract and transform social data from Twitter
In the following steps the twitter tweets about bitcoin for a specific time period (23.10.2019 to 26.10.2019) are extracted, transformed and loaded to a filesytem.
As the search and result limit of the Twitter API is restricted for search results and requests the data were stored on the file system in json format. Note that this step was necessary to decouple the internal ETL pipeline from the Twitter API. 

### Excursus Twitter API/App: 
To be able to extract tweets from Twitter a twitter app need to be configured. During the configuration process the necessary credentials are generated. While setting up the Twitter app, the premium API from Twitter is selected. The premium api allows to executed twitter searches for specific period (e.g 23.10.22019 to 26.10.2019). While the standard api allows only to request data from last the 7 days. More information on different Twitter APIs can be found here: https://developer.twitter.com/en/premium-apis.html

#### 2.1 Extract tweets from twitter and persist original tweets on filesystem
To extract bitcoin tweets, the Twitter Search API is applied. Note that the Twitter Search API is non-public. Therefore, credentials are provided when executing the get request. Hereby, existing twitter tweets between the 23.10.22019 to 26.10.2019 are extracted. The period was choosen as the bitcoin price was extremely volatile in this period.   
After the tweets were extracted the complex json object was flattened and persisted on a storage system. As part of the data cleaning only meaningful tweet attributes are kept. 

In [None]:
def setup_twitterAPI():       
    """
    Description:  Calls the twitter api wrapper and provides credentials to it.         
    Parameters: 
        None
    Returns: 
        twitterAPI: The prepared API wrapper allowing to send request to twitter api
    """
    # Provide all credentials to access the twitter API
    consumer_key = ""
    consumer_secret = ""
    access_token_key = ""
    access_token_secret = ""

    # Get the API from the Twitter API wrapper and provide credentials
    twitterAPI = TwitterAPI(consumer_key, 
                            consumer_secret, 
                            access_token_key, 
                            access_token_secret)
    
    print("Success: Twitter API was configured")
    return twitterAPI
    
def get_parameters():       
    """
    Description: Define all needed parameters to execute a twitter tweet search.
    Parameters: 
        None
    Returns: 
        parameters: Dictionary containing relevant search parameters
    """
    
    # The string in the search term contains a special parameter lang: which requesting only english tweets.
    search_term = "bitcoin lang:en"
    
    # Defines the twitter product being used. The "30 day" product allows to access the last 30 days.
    product = "30day"
    
    # Name of the twitter application defined.
    label = "prototype"
    
    #  The twitter search begins at the startDate and stops at the endDate. However, the search may stop before
    #  in case the number of allowed request is reached (30/minute or 250/month).
    startDate = "201910250000" 
    endDate = "201910260000"
    
    # The nextpage_token is necessary to enable pagination
    nextpage_token = None

    parameters = {"label": label,
                  "product": product,
                  "query": search_term,
                  "fromDate": startDate, 
                  "toDate": endDate,
                  "next": nextpage_token}
    
    print("Success: Parameters for Twitter Api were defined")
    return parameters

def get_twitterdata():       
    """
    Description: Execute get request and save received tweets a dictionary containing only releveant data from tweet. 
                 Note: The results and requests of the twitter api are strictly limited allowing 30 requests/minute and 
                 250 requests/month. In addition only 100 are shown per request (page). Therefore, the request need to be splitted 
                 (e.g. based on date) and executed several times applying next token.
                 Source:          
    Parameters: 
        None
    Returns: 
        cryptocurrency_tweets_pd: Dataframe containing all relevant cryptocurrency tweets for the desired time period
    """
    
    # Make Twitter Api available and define parameters for twitter search
    twitterAPI = setup_twitterAPI()
    parameters = get_parameters()
    
    # As the request is a nested json object containing data which are not needed
    # the relevant details are extracted and persisted in a dictionary 
    cryptocurrency_tweets_dict = {"id": [], 
                                  "date": [], 
                                  "text": [],
                                  "user": [], 
                                  "favorite_count": [], 
                                  "retweet_count": [], 
                                  "followers_count": []}

    # Execute actual twitter search based on search term considering pagination and english language
    # Filter out unnesseary tweet data and create dictionary containing relevant data
    while True:
        request = twitterAPI.request("tweets/search/%s/:%s" % (parameters["product"], parameters["label"]),
                                                              {"query": parameters["query"],
                                                               "fromDate": parameters["fromDate"],
                                                               "toDate": parameters["toDate"],
                                                               "next": parameters["next"]})
        twitter_tweets = request.json()
        if (request.status_code != 200):
            print("Error: Twitter API Call was not successful")
            request_status = False
            break
        for twitter_tweet in twitter_tweets["results"]:
            cryptocurrency_tweets_dict["id"].append(twitter_tweet["id"])
            cryptocurrency_tweets_dict["date"].append(twitter_tweet["created_at"])
            cryptocurrency_tweets_dict["text"].append(twitter_tweet["text"])
            cryptocurrency_tweets_dict["user"].append(twitter_tweet["user"]["screen_name"])
            cryptocurrency_tweets_dict["favorite_count"].append(twitter_tweet["favorite_count"])
            cryptocurrency_tweets_dict["retweet_count"].append(twitter_tweet["retweet_count"])
            cryptocurrency_tweets_dict["followers_count"].append(twitter_tweet["user"]["followers_count"])
        if "next" not in twitter_tweets:
            print("Success: Twitter API Call executed")
            request_status = True
            break
        parameters["next"] = twitter_tweets["next"]
    
    # To ease up post-processing use pandas to create dataframe containing all relevant cryptocurrency tweets
    cryptocurrency_tweets_pd = pd.DataFrame(cryptocurrency_tweets_dict)
    return cryptocurrency_tweets_pd, request_status

def persist_twitterdata(df_twitterdata, request_status):
    """
    Description: Persist the tweets in json format on the filesystem. This is necessary to decouple the from twitter api
                 due to result and call limitations. In case the Twitter Api limit is reached, no data will be persisted. 
                 In that case refer to existing data in datasource/twitter_tweets folder.
    Parameters: 
        df_twitterdata: Panda dataframe containing tweets for a specific period in the past
        request_status: In case the request was executed succesfully the status is set to True. Otherwise False. 
    Returns: 
        None
    """    
    
    if request_status == True:
        cryptocurrency_tweets_df = spark.createDataFrame(cryptocurrency_tweets_pd)
        cryptocurrency_tweets_df.write.json(path="data/twitter_tweets", 
                                            mode="append")
        print("Success: Tweets were written into filesystem")
    else:
        print("Error: Tweets were not written into filesystem")

## Access twitter (30 day premium) api and receive tweets
df_twitterdata, request_status = get_twitterdata()

## Store twitter data on a local filesystem
persist_twitterdata(df_twitterdata, request_status)

### 2. Extract and transform social data from Twitter
In the following to steps the original tweets which have been stored on a filesystem are extracted and transformed. 

#### 2.2 Extract original tweets from filesystem
Here the tweets from the internal filesystem are taken and read into an spark dataframe. 
        
#### 2.3 Transform tweets
As soon as step 2.2 was executed the tweets can be cleaned by removing duplicates, nan as and null values (based on id). Furthermore, the collection of tweets is denormalized and pushed into three different dateframes. Hereby, the dataframes can be seperated into user, content and time based dataframes. The relation between the dataframes is described by there id.

In [4]:
def extract_twitterdata():       
    """
    Description: Load twitter data from external storage into a spark dataframe making it available for
                 denormalization
    Parameters: 
        None
    Returns: 
        cryptocurrency_tweets_df: Spark dataframe containing twitter tweets
    """
 
    # Ensure proper data structure by providing custom schema. Make data structure more robust
    # by ensuring that a value for id is provided
    fields = [StructField("id", LongType(), False),
              StructField("date", StringType(), True),
              StructField("text", StringType(), True),
              StructField("user", StringType(), True), 
              StructField("favorite_count", LongType(), True),
              StructField("retweet_count", LongType(), True),
              StructField("followers_count", LongType(), True)]
                 
    final_schema = StructType(fields)  
    
    # Read twitter data provided as json following the schema definition
    cryptocurrency_tweets_df = spark.read.json(path="datasource/twitter_tweets", schema=final_schema)
    print("Success: Twitter tweets were extracted")
    return cryptocurrency_tweets_df

## Process and denormalize tweets 
def transform_twitterdata(tweets_df):       
    """
    Description:              
    Parameters: 
        transform_twitterdata: Contains all cryptocurrency tweets from filesystem
    Returns: 
        None
    """  
    
    # @udf: Convert ctime format to datetime and remove UTC timezone value as utc time is applicable for the entire project
    get_time_value_sequence_datetime = udf(lambda dateTimeUtc: datetime.datetime.strptime(dateTimeUtc, "%a %b %d %H:%M:%S %z %Y") \
                                                                                .strftime("%Y-%m-%d %H:%M:%S"))
    
    # Ensure data quality dropping null values and duplicates in id. 
    tweets_df.na.drop(subset=["id"]).dropDuplicates(["id"])
    
    # Convert ctime format to datetime allowing to extract date segments within dataframe
    tweets_df = tweets_df.withColumn("datetime", get_time_value_sequence_datetime(tweets_df.date)) 
    
    # Contains time related information for each tweet splitted into time segments.
    tweets_time_df = tweets_df.select(["id",
                                       month("datetime").alias("month"),
                                       year("datetime").alias("year"),
                                       dayofweek("datetime").alias("weekday"),
                                       weekofyear("datetime").alias("weekofyear"),
                                       "datetime"])
    
    # Contains username and how many followers belong to a user
    tweets_user_df = tweets_df.select("id",
                                      "user",
                                      "followers_count")
    
    # Contains content related to an usertweet like the tweet message and how many times it was liked and retweeted.
    tweets_content_df = tweets_df.select("id",
                                         "text",
                                         "favorite_count",
                                         "retweet_count")
    
    # Return dataframes as tuple for post-processing
    print("Success: Tweet dataframes were transformed")
    return tweets_time_df, tweets_user_df, tweets_content_df

## Extract tweets from filesystem
tweets_df = extract_twitterdata()  
## Process cryptocurrency tweets and denormalize tweets into time, user and content tweets
tweets_time_df, tweets_user_df, tweets_content_df = transform_twitterdata(tweets_df)

Success: Twitter tweets were extracted
Success: Tweet dataframes were transformed


### 3. Run quality checks on tweets and marketdata
Before the data are persisted dedicated quality checks are executed on the market- and  twitterdata. Hereby, the quality checks are running on all six dataframes which have been created as outcome of the transformation process.
In case no records are available the data quality check won't pass. Furthermore, the quality check highlights dataframes with low data quality due to nan or null values. 

In [5]:
def data_quality_report(data_quality_df):
    """
    Description: Gives a statement about the data quality. A high number of Nan or Null values results in a low quality
                 of the respective dataframe
    Parameters: 
        data_quality_df: Dataframe containing results of the quality check.
    Returns: 
        None
    """    
    for column in data_quality_df.columns:
        if data_quality_df.filter(data_quality_df[column] != 0).count() > 0:
            print("Data Quality Low. Check report")
            data_quality_df.show()
            break
        else:
            None

    print("Data Quality High. No action required")

def data_quality_check(data_df):
    """
    Description: Validate the quality of the dataframes and report quality issues in case it contain null or nan entries
                 Note: If the record count is 0 do not save the dataframe on the file system
                       In case the the columns (except "id") does contain nan/null values store the dataframe on filesystem but
                       report low data quality. In all other cases report high data quality and proceed.
    Parameters: 
        data_df: Dataframe which will be checked on its quality
    Returns: 
        Boolean: In case quality check was succesful True will be returned.
    """    

    # Count available records ensuring records are available
    count_records = data_df.count()
    
    # Create data quality dataframe aggregating available nan and null values for each column
    data_quality_df = data_df.select([count(when(isnan(column), column)).alias(column) for column in data_df.columns])

    # Apply results  
    if count_records == 0:
        print("Error: No records available")
        return False
    else:
        data_quality_report(data_quality_df)
        return True
    
## Execute all validation checks for available dataframes.
## In case the validation was succesfully True is returned. Otherwise False.
data_quality_tweets_time = data_quality_check(tweets_time_df)
data_quality_tweets_user = data_quality_check(tweets_user_df)
data_quality_tweets_content = data_quality_check(tweets_content_df)
data_quality_marketdata_time = data_quality_check(df_marketdata_time)
data_quality_marketdata_trade = data_quality_check(df_marketdata_trade)
data_quality_marketdata_symbol = data_quality_check(df_marketdata_symbol)

Data Quality High. No action required
Data Quality High. No action required
Data Quality High. No action required
Data Quality High. No action required
Data Quality High. No action required
Data Quality High. No action required


### 4. Persist tweets and marketdata on filesystem
If a dataframe passed a quality check the respective data are persisted as parquet file on a filesystem. Hereby, for each dataframe a suitable directory is created.

In [6]:
def load_data(data_df, path, passed_data_quality):
    """
    Description: Load final dataframes from into the targetsystem. Write operations being executed
                 as soon as respective data quality check was passed.
    Parameters: 
        data_df: Dataframe being persisted
        path: Directory on the filesystem where the data being persisted
        passed_data_quality: Boolean - Value depends on the outcome of the data quality check
    Returns: 
        None
    """    
    
    # Write dataframe content to file system in case quality checks were succesful (true)
    # In case quality checks have not been passed, deny write operation
    if passed_data_quality == True:
        time_table_parquet = data_df.write.parquet(path=path, 
                                                   mode="overwrite")
    else:
        print("Error: Data not written into File Storage System. Data Quality check was not passed.")
        
    print("Success: Dataframe was loaded into targetsystem")

# Define destination path for parquet files
path_tweets_time = "data/tweetsdata/time.parquet"
path_tweets_user = "data/tweetsdata/user.parquet"
path_tweets_content = "data/tweetsdata/content.parquet"
path_marketdata_time = "data/marketdata/time.parquet"
path_marketdata_trade = "data/marketdata/trade.parquet"
path_marketdata_symbol = "data/marketdata/symbol.parquet"

## Persist all dataframes in local storage system
## In case the validation was succesfully True is returned. Otherwise False.
load_data(tweets_time_df, path_tweets_time, data_quality_tweets_time)
load_data(tweets_user_df, path_tweets_user, data_quality_tweets_user)
load_data(tweets_content_df, path_tweets_content, data_quality_tweets_content)
load_data(df_marketdata_time, path_marketdata_time, data_quality_marketdata_time)
load_data(df_marketdata_trade, path_marketdata_trade, data_quality_marketdata_trade)
load_data(df_marketdata_symbol, path_marketdata_symbol, data_quality_marketdata_symbol)

Success: Dataframe was loaded into targetsystem
Success: Dataframe was loaded into targetsystem
Success: Dataframe was loaded into targetsystem
Success: Dataframe was loaded into targetsystem
Success: Dataframe was loaded into targetsystem
Success: Dataframe was loaded into targetsystem


### Create Data Dictionary: 
Print schema Pyspark dataframes instead of creating addtional data dictionary as Pyspark relys on spark dataframes

In [7]:
## Print schemas instead of creating a data dictionary as pyspark works with dataframes.
# Dataframes received from the APIs
df_marketdata.printSchema()
tweets_df.printSchema()

# Final dataframes being written into filesystem
tweets_time_df.printSchema()
tweets_user_df.printSchema() 
tweets_content_df.printSchema() 
df_marketdata_time.printSchema()
df_marketdata_symbol.printSchema()
df_marketdata_trade.printSchema()

## Extract from source and final tables 
df_marketdata.show(5)
tweets_df.show(5)

# Final dataframes being written into filesystem
tweets_time_df.show(5)
tweets_user_df.show(5) 
tweets_content_df.show(5)
df_marketdata_time.show(5)
df_marketdata_symbol.show(5)
df_marketdata_trade.show(5)

root
 |-- open_time: long (nullable = true)
 |-- open: decimal(20,10) (nullable = true)
 |-- high: decimal(20,10) (nullable = true)
 |-- low: decimal(20,10) (nullable = true)
 |-- close: decimal(20,10) (nullable = true)
 |-- volume: decimal(20,10) (nullable = true)
 |-- close_time: long (nullable = true)
 |-- quote_asset_volume: decimal(20,10) (nullable = true)
 |-- number_of_trades: integer (nullable = true)
 |-- taker_buy_base_asset_volume: decimal(20,10) (nullable = true)
 |-- taker_buy_quote_asset_volume: decimal(20,10) (nullable = true)
 |-- symbol: string (nullable = false)

root
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- followers_count: long (nullable = true)

root
 |-- id: long (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer 

In [2]:
import os
import tarfile

def recursive_files(dir_name='.', ignore=None):
    for dir_name,subdirs,files in os.walk(dir_name):
        if ignore and os.path.basename(dir_name) in ignore: 
            continue

        for file_name in files:
            if ignore and file_name in ignore:
                continue

            yield os.path.join(dir_name, file_name)

def make_tar_file(dir_name='.', tar_file_name='tarfile.tar', ignore=None):
    tar = tarfile.open(tar_file_name, 'w', dereference=True)

    for file_name in recursive_files(dir_name, ignore):
        tar.add(file_name)

    tar.close()


dir_name = '.'
tar_file_name = 'archive.tar'
ignore = {'.ipynb_checkpoints', '__pycache__', tar_file_name}
make_tar_file(dir_name, tar_file_name, ignore)