# Análisis de las divisas de criptomonedas

__Introducción__

Las criptomonedas son monedas virtuales, pueden ser intercambiadas y operadas como cualquier otra divisa tradicional, pero están fuera del control de los gobiernos e instituciones financieras. Existe un gran número de criptodivisas disponibles, todas con sus propias características y aplicaciones. Las que tienen mayor capitalización bursátil son, al menos por ahora, una minoría que incluye el bitcoin, el bitcoin cash, el ether, el litecoin y el dash.

Las criptomonedas pueden considerarse una alternativa a las divisas tradicionales, pero en realidad fueron concebidas como una solución de pago completamente convencional. En estos momentos, un buen número de tiendas aceptan criptomonedas como forma de pago.

<!--![my_test_image](https://raw.githubusercontent.com/gandres-dev/CursoDatosMasivosI/master/img/bitcoin.jpg)-->

<img src="https://raw.githubusercontent.com/gandres-dev/CursoDatosMasivosI/master/img/bitcoin.jpg" alt="drawing" style="display: block;
  margin-left: auto;
  margin-right: auto;
  width: 30%;"/>

### Conectando Apache Kafka con Spark Structured streaming
Nos suscribimos al topico de criptos para obtener los datos generados del productor:

In [0]:
#Carga de la biblioteca
import pyspark
# Carga funciones extra
from pyspark.sql.functions import * 
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
criptos_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", "criptos") \
    .load()

criptos_df_string = criptos_df.selectExpr("CAST(value AS STRING)", "timestamp")
criptos_df_string

Out[4]: DataFrame[value: string, timestamp: timestamp]

Creamos un esquema para nuestro dataframe a traves de `json_tuple`:

In [0]:
from pyspark.sql.functions import explode,split,col, json_tuple

criptos_prices = criptos_df_string.select(json_tuple(col("value"), 'BTC', 'USD', 'EUR'), col("timestamp")) \
                 .select(col("c0").alias("BTC"), col("c1").alias("USD"), col("c2").alias("EUR"), col("timestamp").alias("timestamp"))
print(criptos_prices)

DataFrame[BTC: string, USD: string, EUR: string, timestamp: timestamp]


## Precio de dolares a traves tiempo

Como primer analisis, nos enfocaremos en el precio del dolar a traves del tiempo con el objetivo de observar los precios y ver su comportamiento mediante del flujo de datos.

In [0]:
cripto_tab = criptos_prices.withColumn('precio_btc', criptos_prices.BTC.cast('float'))
cripto_tab = cripto_tab.withColumn('precio_usd', criptos_prices.USD.cast('float'))
cripto_tab = cripto_tab.withColumn('precio_eur', criptos_prices.EUR.cast('float'))

print(cripto_tab)

writeBitcoin = cripto_tab.writeStream.format("memory"). \
    queryName("bitcoinquery"). \
    trigger(processingTime='2 seconds'). \
    start()

DataFrame[BTC: string, USD: string, EUR: string, timestamp: timestamp, precio_btc: float, precio_usd: float, precio_eur: float]


In [0]:
%sql
select date_format(timestamp, "hh:mm") as minuto,
AVG(precio_usd) as precio_dolares, AVG(precio_eur) as precio_euros, AVG(precio_btc) as precio_bitcoin
from bitcoinquery
GROUP BY minuto

minuto,precio_dolares,precio_euros,precio_bitcoin
03:22,1996.4570068359376,1886.380017089844,0.0665640011429786
03:21,1991.4077826605903,1882.217787000868,0.0665166651209195
03:23,2001.0499877929688,1890.205017089844,0.0665790006518363
03:24,2006.517267400568,1895.009099786932,0.0666400018063458
03:25,2009.920007324219,1899.269995117188,0.0666500024497509
03:26,2010.05400390625,1899.339013671875,0.0666889980435371
03:27,2007.5245361328125,1897.1527210582387,0.0666818171739578
03:28,2008.1230224609376,1897.085986328125,0.0667060010135173
03:29,2008.3357107979912,1896.8243059430804,0.066720001399517


## Precio minimo y maximo por minuto del dolar

Conoceremos el valor minimo y maximo que podemos encontrar en cada minuto.

In [0]:
%sql
SELECT date_format(timestamp, "hh:mm") as minuto,
MAX(precio_usd) as precio_dolar_maximo, MIN(precio_usd) as precio_dolar_minimo
from bitcoinquery
GROUP BY minuto

minuto,precio_dolar_maximo,precio_dolar_minimo
03:22,1999.5,1992.78
03:21,1992.78,1990.44
03:23,2001.96,2000.54
03:24,2009.73,2000.93
03:25,2010.75,2008.49
03:26,2011.05,2008.71
03:27,2009.42,2006.39
03:28,2008.66,2007.36
03:29,2009.59,2007.93
03:30,2014.63,2009.82


## Precio minimo y maximo por minuto del euro

Con el mismo procedimiento, lo haremos ahora con la moneda del euro:

In [0]:
%sql
SELECT date_format(timestamp, "hh:mm") as minuto,
MAX(precio_eur) as precio_euro_maximo, MIN(precio_eur) as precio_euro_minimo
from bitcoinquery
GROUP BY minuto

minuto,precio_euro_maximo,precio_euro_minimo
03:25,1900.45,1898.09
03:26,1899.79,1897.96
03:22,1890.38,1883.13
03:24,1899.96,1889.82
03:21,1882.37,1882.07
03:23,1890.93,1889.81
03:29,1898.11,1896.23
03:31,1903.29,1898.56
03:28,1897.57,1895.78
03:27,1898.25,1895.68


### Frecuencias del dolar

A traves de los precios construiremos un histograma de frecuencias para ver conocer el valor que más se repite en el flujo.

In [0]:
%sql
select int(USD), count(USD) from bitcoinquery
GROUP BY int(USD)

USD,count(USD)
1990,4
1994,3
1991,2
1995,1
1992,4
1997,2
1998,2
2001,4
2000,9
1999,1


### Precias del dolar y euro a traves del tiempo

In [0]:
%sql
select int(USD), int(EUR) from bitcoinquery

USD,EUR
1990,1882
1990,1882
1992,1882
1992,1882
1991,1882
1991,1882
1990,1882
1990,1882
1992,1882
1992,1883


## Palabras más usuadas en las noticas de Criptomonedas

Dado que podemo extraer diferentes noticias sobre temas de criptomonedas, analizaremos la palabras que más repiten en este flujo de noticias.

In [0]:
# Leemos nuestro datos del topico de noticias
# Se debe de correr primero de producer-noticias para capturar los titulos
cripto_noticias_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", "noticias") \
    .load()

# type cast the column value
cripto_noticias_df_string = cripto_noticias_df.selectExpr("CAST(value AS STRING)")

Para este analisis nos enfocaremos en los titulos de noticias para extraer las palabras:

In [0]:
from pyspark.sql.functions import explode,split,col

# Separamos por valabras y las contamos
noticias_tab = cripto_noticias_df_string.withColumn('word', explode(split(col('value'), ' '))) \
    .groupBy('word') \
    .count() \
    .sort('count', ascending=False)

In [0]:
writeNoticias = noticias_tab.writeStream. \
    outputMode("complete"). \
    format("memory"). \
    queryName("noticiasquery"). \
    trigger(processingTime='1 seconds'). \
    start()

In [0]:
%sql 
select * from noticiasquery

word,count
bitcoin,6
price,3
usd,2
1inch,2
–,2
prediction,2
barrier,1
schools,1
losing,1
cfo,1


## Resources
- https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=e588c84d-c012-4c07-8593-b08d1bf1b087' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>