# Document the Data Sources
* [Coingecko python code](https://github.com/man-c/pycoingecko)

In [58]:
'''
    WARNING CONTROL to display or ignore all warnings
'''
import warnings; warnings.simplefilter('ignore')     #switch betweeb 'default' and 'ignore'
import traceback

''' Set debug flag to view extended error messages; else set it to False to turn off debugging mode '''
debug = True

## Initialize classes

In [59]:
import os
import sys
import configparser
from datetime import datetime, date, timedelta

sys.path.insert(1,"/home/nuwan/workspace/rezaware/")
import rezaware as reza
from wrangler.modules.assets.etl import cryptoMCExtractor as mcap
from utils.modules.etl.load import sparkDBwls as spark
from utils.modules.etl.load import sparkNoSQLwls as nosql

''' restart initiate classes '''
if debug:
    import importlib
    reza = importlib.reload(reza)
    mcap = importlib.reload(mcap)
    nosql = importlib.reload(nosql)
    spark = importlib.reload(spark)
    
__desc__ = "get crypto macket capitalization data"
clsNoSQL = nosql.NoSQLWorkLoads(desc=__desc__)
clsSpark = spark.SQLWorkLoads(desc=__desc__)
''' optional - if not specified class will use the default values '''
prop_kwargs = {"WRITE_TO_TMP":True,   # necessary to emulate the etl dag
              }
clsMC = mcap.CryptoMarkets(desc=__desc__, **prop_kwargs)
print("\nClass initialization and load complete!")

All python packages in rezaware loaded successfully!
All assets-module etl-packages in function-CryptoMarket imported successfully!
All etl-module load-packages in function-sparkNoSQLwls imported successfully!
All packages in utils etl load sparkdbwls imported successfully!
sparkNoSQLwls Class initialization complete
sparkNoSQLwls Class initialization complete
CryptoMarket Class initialization complete

Class initialization and load complete!


## Transform collections into Spark DataFrame

In [60]:
# _data_owner='coingecko'
__SOURCEDBNAME__ = 'tip-historic-marketcap'
__DESTINDBNAME__ = 'tip'
__DESTINTABLE__ = 'mcap_past'

_from_dt=date(2022,1,1)
_to_dt=date(2022,3,31)
_kwargs = {
#     "SOURCEDBNAME":'tip-historic-marketcap',
    "DBAUTHSOURCE":'tip-historic-marketcap',
   "HASINNAME":'coingecko.2022-04-01.btc',
#     "COLLLIST":['coingecko.2022-04-01.btc','coingecko.2022-04-01.etc'],
#     "COLLLIST":['coingecko.2022-04-01.btc'],
    "DESTINTBLNAME":'mcap_past',
    "FIND":{'symbol':{'$eq':'btc'}},   # use the find key to define a filter
    "COLUMNSMAP":{'_id':'uuid',
                 'source':'data_source',
                 'symbol':'asset_symbol',
                 'date':'mcap_date',
                 'marketcap':'mcap_value'
                }
}
_data = clsMC.nosql_to_sql(
    source_db=__SOURCEDBNAME__,
    coll_list=[],
    destin_db=__DESTINDBNAME__,
    table_name=__DESTINTABLE__,
    **_kwargs,
)

In [83]:
# print(clsSpark.dbConnURL)
print(clsSpark.session)

<pyspark.sql.session.SparkSession object at 0x7feba0750400>


In [61]:
print(_data.shape)
_data

(2155, 5)


Unnamed: 0,uuid,data_source,asset_symbol,mcap_date,mcap_value
0,63957c2b9d51d344925eb63d,coingecko,btc,2022-04-01 00:01:18.562,8.837488e+11
1,63957c2b9d51d344925eb63e,coingecko,btc,2022-04-01 01:01:32.205,8.727383e+11
2,63957c2b9d51d344925eb63f,coingecko,btc,2022-04-01 02:02:19.255,8.698273e+11
3,63957c2b9d51d344925eb640,coingecko,btc,2022-04-01 03:02:05.569,8.728794e+11
4,63957c2b9d51d344925eb641,coingecko,btc,2022-04-01 04:01:42.978,8.659717e+11
...,...,...,...,...,...
2150,63957c2b9d51d344925ebea3,coingecko,btc,2022-06-29 19:02:05.966,3.836757e+11
2151,63957c2b9d51d344925ebea4,coingecko,btc,2022-06-29 20:02:13.167,3.834211e+11
2152,63957c2b9d51d344925ebea5,coingecko,btc,2022-06-29 21:01:46.179,3.820171e+11
2153,63957c2b9d51d344925ebea6,coingecko,btc,2022-06-29 22:03:34.850,3.802612e+11


## Spark MongoDB experiment

In [55]:
print(clsNoSQL.dbType,
      clsNoSQL.dbHostIP,
      clsNoSQL.dbPort,
      clsNoSQL.dbFormat,
      clsNoSQL.dbName,
      clsNoSQL.dbUser,
      clsNoSQL.dbPswd,
      clsNoSQL.dbAuthSource,
      clsNoSQL.dbAuthMechanism,
      clsNoSQL.sparkMaster,
     )

mongodb 127.0.0.1 27017 mongo tip-asset-metadata farmraider spirittribe tip-asset-metadata SCRAM-SHA-256 local[1]


In [51]:
from pyspark.sql import SparkSession
_appName = " ".join(
    [clsNoSQL.__app__,
     clsNoSQL.__name__,
     clsNoSQL.__package__,
     clsNoSQL.__module__
    ])   #"PySpark MongoDB Examples"
_master = clsNoSQL.sparkMaster  # "local[1]"
_h_ip = clsNoSQL.dbHostIP #"127.0.0.1"
_type = clsNoSQL.dbType   # mongodb
_port = clsNoSQL.dbPort   # 27017
_format = clsNoSQL.dbFormat # mongo
_user = clsNoSQL.dbUser   # "farmraider"
_pswd = clsNoSQL.dbPswd   # "spirittribe"
_auth = clsNoSQL.dbAuthSource   #"tip-historic-marketcap"
_db = clsNoSQL.dbName     # "tip-historic-marketcap"
_coll = 'coingecko.2022-07-01.btc'

_inp_uri = f"{_type}://{_user}:{_pswd}@{_h_ip}/{_db}.{_coll}?authSource={_auth}"
_out_uri = f"{_type}://{_user}:{_pswd}@{_h_ip}/{_db}.{_coll}?authSource={_auth}"

# Create Spark session
spark = SparkSession.builder \
    .appName(_appName) \
    .master(_master) \
    .config("spark.mongodb.input.uri", _inp_uri) \
    .config("spark.mongodb.output.uri", _out_uri) \
    .getOrCreate()

# Read data from MongoDB
df = spark.read.format(_format).load()
df.printSchema()
df.show()

22/12/16 15:15:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- marketcap: double (nullable = true)
 |-- source: string (nullable = true)
 |-- symbol: string (nullable = true)

+--------------------+-------------------+--------------------+---------+------+
|                 _id|               date|           marketcap|   source|symbol|
+--------------------+-------------------+--------------------+---------+------+
|{6390a3bf9d51d344...|2022-07-01 16:00:00|3.659842243234527...|coingecko|   btc|
|{6390a3bf9d51d344...|2022-07-02 16:00:00|3.709165299187163E11|coingecko|   btc|
|{6390a3bf9d51d344...|2022-07-03 16:00:00| 3.67706689137568E11|coingecko|   btc|
|{6390a3bf9d51d344...|2022-07-04 16:00:00|3.685493641350522...|coingecko|   btc|
|{6390a3bf9d51d344...|2022-07-05 16:00:00|3.864251762627784...|coingecko|   

## Transform & Load into Postgresql

In [138]:
_data_owner='coingecko'
_from_dt=date(2022,1,1)
_to_dt=date(2022,3,31)

_data = clsMC.trasnform_mcap(
    data_owner=_data_owner,
    from_date=_from_dt,
    to_date=_to_dt,
)
print(type(_data))

KeyboardInterrupt: 