In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=f58432217e1aaba4545d6b9ede0bfa9a86dabea700d0e3d2187da8382626f901
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from threading import Thread
import requests
import json
import shutil
import os
import time

In [5]:
spark = SparkSession.builder.appName("Lab07").getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

temp_dir = "./tmp"
if os.path.exists(temp_dir):
  shutil.rmtree(temp_dir)
print(f"Diretorio criado: {temp_dir}")
os.makedirs(temp_dir)

Diretorio criado: ./tmp


In [6]:
def fetch_api_data():
  counter = 0
  while True:
    try:
      response = requests.get('https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd')
      if response.status_code == 200:
        data = response.json()
        file_path = f"{temp_dir}/data_{counter}.json"
        with open(file_path, "w") as f:
          json.dump(data, f)
        print(f"Dados da API obtidos: {data}\nSalvos em :{file_path}")
        counter +=1
      else:
        print(f"Falha ao obter dados da API: {response.status_code}")
    except Exception as e:
      print(f"Erro ao fazer vizualizacao: {e}")
    time.sleep(30)

In [None]:
print("iniciando thread para obtencao de dados da api...")
data_thread = Thread(target=fetch_api_data)
data_thread.start()

print("configurando leitura de dados de streaming...")
lines = spark.readStream.schema('bitcoin double').json(temp_dir)

print("configurando transformacao de dados...")
prices = lines.select(col("bitcoin").alias("bitcoin_prince"))

print("iniciando consulta de streaming...")
query = prices.writeStream.outputMode("append").format("parquet")\
    .option('checkpointLocation', f"{temp_dir}/output")\
    .option("path", f"{temp_dir}/output").start()

print("Streaming iniciando aguardando interrupcao...")
query.awaitTermination()

iniciando thread para obtencao de dados da api...
configurando leitura de dados de streaming...
Dados da API obtidos: {'bitcoin': {'usd': 69105}}
Salvos em :./tmp/data_0.json
configurando transformacao de dados...
iniciando consulta de streaming...
Streaming iniciando aguardando interrupcao...
Dados da API obtidos: {'bitcoin': {'usd': 69105}}
Salvos em :./tmp/data_1.json
Dados da API obtidos: {'bitcoin': {'usd': 69105}}
Salvos em :./tmp/data_2.json
Dados da API obtidos: {'bitcoin': {'usd': 69105}}
Salvos em :./tmp/data_3.json
Dados da API obtidos: {'bitcoin': {'usd': 69100}}
Salvos em :./tmp/data_4.json
Dados da API obtidos: {'bitcoin': {'usd': 69100}}
Salvos em :./tmp/data_5.json
Dados da API obtidos: {'bitcoin': {'usd': 69100}}
Salvos em :./tmp/data_6.json
Dados da API obtidos: {'bitcoin': {'usd': 69100}}
Salvos em :./tmp/data_7.json
Dados da API obtidos: {'bitcoin': {'usd': 69100}}
Salvos em :./tmp/data_8.json
Dados da API obtidos: {'bitcoin': {'usd': 69100}}
Salvos em :./tmp/data_9