# Salvando Dados da FakerAPI difentes destinos e formatos
Fonte dos Dados: https://fakerapi.it/en 
- MongoDB
- AWS S3
- CSV
- json
- Parquet

## Importando as libs necessárias

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import requests
import json
import boto3

# MongoDB
user = ''
password = ''
collection = ''

## Criando uma Sesssão Spark 

In [111]:
spark = (
    SparkSession
    .builder
    .master('local')
    .appName('api_to_mongodb')
    .getOrCreate()
)

## Fazendo a requisição da API

In [112]:
def requests_api():
    try:
        r = requests.get('https://fakerapi.it/api/v1/persons?_quantity=100')
        r.raise_for_status()
        return r
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        return None

In [113]:
requests_api()

<Response [200]>

## Obtendo os Dados da API

In [149]:
def get_data(): 
    try:
        data = requests_api()
        return data.json()['data']
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        return None

In [150]:
get_data()

[{'id': 1,
  'firstname': 'Marjolaine',
  'lastname': 'Rosenbaum',
  'email': 'handerson@gmail.com',
  'phone': '+7454666536414',
  'birthday': '1971-11-18',
  'gender': 'female',
  'address': {'id': 0,
   'street': '338 Langosh Gardens',
   'streetName': "O'Connell Camp",
   'buildingNumber': '6842',
   'city': 'Port Paoloburgh',
   'zipcode': '55359-9514',
   'country': 'Netherlands',
   'county_code': 'WF',
   'latitude': 65.13877,
   'longitude': 175.247248},
  'website': 'http://lesch.info',
  'image': 'http://placeimg.com/640/480/people'},
 {'id': 2,
  'firstname': 'Demetrius',
  'lastname': 'Bode',
  'email': 'sward@cronin.com',
  'phone': '+5447141694359',
  'birthday': '1963-05-05',
  'gender': 'male',
  'address': {'id': 0,
   'street': '680 Brakus Cliffs Suite 494',
   'streetName': 'Gutkowski Key',
   'buildingNumber': '9790',
   'city': 'Mohrmouth',
   'zipcode': '88611',
   'country': 'Falkland Islands (Malvinas)',
   'county_code': 'GL',
   'latitude': -46.855216,
   'lo

## Salvando os dados em um DataFrame PySpark

In [308]:
def data_from_api_to_df():
    data_list = get_data()
    try:
        df = (
            spark
            .createDataFrame(
                [Row(
                    id=data['id'],
                    nome=data['firstname'],
                    sobrenome=data['lastname'],
                    email=data['email'],
                    telefone=data['phone'],
                    data_nascimento=data['birthday'],
                    genero=data['gender'],
                    id_endereco=data['address']['id'],
                    endereco=data['address']['street'],
                    rua=data['address']['streetName'],
                    numero=data['address']['buildingNumber'],
                    cidade=data['address']['city'],
                    cep=data['address']['zipcode'],
                    pais=data['address']['country'],
                    cod_pais=data['address']['county_code'],
                    latitude=data['address']['latitude'],
                    longitude=data['address']['longitude'],
                    site=data['website'],
                    imagem=data['image']
                ) for data in data_list]
            )
        )
        return df
    except Exception as e:
        print(e)

In [309]:
df = data_from_api_to_df()

In [310]:
df.show(2)

+---+--------+---------+--------------------+--------------+---------------+------+-----------+--------------------+-------------+------+------------+-----+--------------------+--------+----------+----------+------------------+--------------------+
| id|    nome|sobrenome|               email|      telefone|data_nascimento|genero|id_endereco|            endereco|          rua|numero|      cidade|  cep|                pais|cod_pais|  latitude| longitude|              site|              imagem|
+---+--------+---------+--------------------+--------------+---------------+------+-----------+--------------------+-------------+------+------------+-----+--------------------+--------+----------+----------+------------------+--------------------+
|  1|   Cyrus|     Hane|delaney.wisozk@st...|+9288929886870|     1994-06-10|  male|          0|2460 Bartoletti L...|  Brown Views| 73363|Dibbertmouth|99942|Antarctica (the t...|      SE|-26.746126|   28.7839|http://schmidt.biz|http://placeimg.c...|
|  2

## Salvando em CSV

In [340]:
(
    df
    .write
    .format("csv")
    .mode('overwrite')
    .save("consumer_data")
)

## Salvando em Parquet

In [341]:
(
    df
    .write
    .format("parquet")
    .mode('overwrite')
    .save("consumer_data_parquet")
)

## Salvando em json

In [342]:
(
    df
    .write
    .format("json")
    .mode('overwrite')
    .save("consumer_data_json")
)

## Salvando os dados no MongoDB Atlas

### Instalação do PyMongo

In [166]:
!pip install pymongo

Collecting pymongo
  Downloading pymongo-4.6.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.4.2-py3-none-any.whl.metadata (4.9 kB)
Downloading pymongo-4.6.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (680 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m680.8/680.8 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
[?25hDownloading dnspython-2.4.2-py3-none-any.whl (300 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m300.4/300.4 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.4.2 pymongo-4.6.1


### Importação do PyMongo

In [181]:
from pymongo.mongo_client import MongoClient

### Conexão com o PyMongo

In [263]:
def conn_database(user, password):
    CONNECTRION_STRING = f"mongodb+srv://{user}:{password}@cluster0.oudmtpq.mongodb.net/retryWrites=true&w=majority"
    client = MongoClient(CONNECTRION_STRING)
    try:
        return client
    except Exception as e:
        print(e)
        return None

### Pegando o Database

In [304]:
def get_database():
    try:
        conn_db = conn_database(user, password)
        db = conn_db['shop']
        return db
    except Exception as e:
        print(e)
        return None

### Pegando a collection

In [320]:
def get_collection():
    try:
        db = get_database()
        collect = db['customer']
        return collect
    except Exception as e:
        print(e)
        return None

### Inserindo os dados da API no MongoDB Atlas

In [321]:
dados = get_data()
insert = get_collection()

In [332]:
def insert_data_from_api_to_mongo():
    try:
        api_data = get_data()
        db_collection = get_collection()
        insert_data_mongo = db_collection.insert_many(api_data)
        print('OK')
    except Exception as e:
        print(e)
        return None

In [333]:
insert_data_from_api_to_mongo()

OK


## Salvando os dados no AWS S3

### Instância do S3 Bucket

In [3]:
S3 = boto3.client(
    's3', 
    aws_access_key_id = '',
    aws_secret_access_key = '',
    aws_session_token = '',
    region_name = 'us-east-1'
)




### Criação do Bucket S3

In [1]:
def create_s3_buckets(*args):
    try:
        for bckt in args:
            bucket = S3.create_bucket(
                ACL = 'private', # Gerencia o acesso a bucekts e objetos do S3
                Bucket = bckt
            )
            print(bckt)
    except Exception as e:
        print(e)

In [6]:
def upload_file_to_s3(folder_name_source_files, bucket):
    import os

    try:
        source_file = os.getcwd() + '/' + folder_name_source_files + '/'
        list_files = os.listdir(source_file)
    
        for file in list_files:
            if '.csv' in file:
                up_file = S3.upload_file(
                    Filename = f'{source_file}{file}',
                    Bucket = bucket,
                    Key = file
                )
                print(file)
    except Exception as e:
        print(e)




In [9]:
create_s3_buckets('api_to_s3')




In [None]:
# nome da pasta ontem estão os arquivos
# nome do bucket
upload_file_to_s3('arquivos', 'api_to_s3')