In [None]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField, IntegerType
from pyspark.sql.functions import udf


def get_github_user_info(login, token):
    url = f"https://api.github.com/users/{login}"
    headers = {"Authorization": f"token {token}"}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        user_info = response.json()
        return (
            user_info.get('name'),
            user_info.get('company'),
            user_info.get('blog'),
            user_info.get('email'),
            user_info.get('bio'),
            user_info.get('public_repos'),
            user_info.get('followers'),
            user_info.get('following'),
            user_info.get('created_at')
        )
    else:
        return (None, None, None, None, None, None, None, None, None)

class DataExtractorIfood:
    def __init__(self, spark: SparkSession, user: str, token: str):
        self.spark = spark
        self.user = user
        self.token = token

    def get_followers(self):
        followers = []
        page = 1
        while True:
            end_point_followers = f'https://api.github.com/users/{self.user}/followers?page={page}'
            response_followers = requests.get(end_point_followers, headers={"Authorization": f"token {self.token}"}).json()
            if not response_followers:
                break
            followers.extend(response_followers)
            page += 1
        users = [{'login': follower['login']} for follower in followers]
        followers_df = self.spark.createDataFrame(users)
        return followers_df
        
    def enrich_with_github_info(self, df):
        schema = StructType([
            StructField("name", StringType(), True),
            StructField("company", StringType(), True),
            StructField("blog", StringType(), True),
            StructField("email", StringType(), True),
            StructField("bio", StringType(), True),
            StructField("public_repos", IntegerType(), True),
            StructField("followers", IntegerType(), True),
            StructField("following", IntegerType(), True),
            StructField("created_at", StringType(), True)
        ])

        token = self.token

        @udf(returnType=schema)
        def get_github_user_info_udf(login):
            return get_github_user_info(login, token)

        enriched_df = df.withColumn("github_info", get_github_user_info_udf(df["login"]))

        enriched_df = enriched_df.select(
            "github_info.name",
            "github_info.company",
            "github_info.blog",
            "github_info.email",
            "github_info.bio",
            "github_info.public_repos",
            "github_info.followers",
            "github_info.following",
            "github_info.created_at"
        )

        return enriched_df

    def execute_extract_api(self):
        df = self.get_followers()
        df_extract = self.enrich_with_github_info(df=df)
        return df_extract

    def extract_csv(self, path):
        return self.spark.read.format("csv").options(header = True, multiLine=True).load(path)

    def get_count_followers(self):
        url = f"https://api.github.com/users/{self.user}"
        response = requests.get(url, headers={"Authorization": f"token {self.token}"})
        data = response.json()
        followers_count = data.get('followers')
        return followers_count


In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import regexp_replace, to_date, date_format

class DataLoaderIfood:
    def __init__(self, path: str):
        self.path = path

    def save_to_csv(self, df: DataFrame):
        df.coalesce(1).write.csv(self.path, header=True, mode="overwrite")


In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import regexp_replace, to_date, date_format

class DataTransformerIfood:
    @staticmethod
    def transform(df: DataFrame) -> DataFrame:
        df_transformed = df.withColumn("company", regexp_replace("company", "@", "")) \
                           .withColumn("created_at", date_format(to_date("created_at", "yyyy-MM-dd'T'HH:mm:ss'Z'"), "dd/MM/yyyy"))
        return df_transformed


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  col, regexp_extract

class DataTestIfood:

    def __init__(self, spark: SparkSession,path,user, token):
        self.spark = spark
        self.path = path
        self.user = user
        self.token = token
        self.headers = {"Authorization": f"token {token}"}

    def read_data(self):
        df_test = DataExtractorIfood(spark=self.spark,user=self.user,token=self.token).extract_csv(path=self.path)
        
        return df_test

    def clean_company(self,df):
        
        if df.filter(col("company").like("%@%")).count() > 0:
            print("Existe pelo menos um '@' na coluna 'company'.")
        else:
            print("Não existe '@' na coluna 'company'.")
        
    def date_format(self,df):

        pattern = r'^\d{2}/\d{2}/\d{4}$'

        if df.filter(regexp_extract(col("created_at"), pattern, 0) != "").count() == df.count():
            print("A coluna 'created_at' está com as datas no formato dd/mm/yyyy.")
        else:
            print("A coluna 'created_at' não está com todas as datas no formato dd/mm/yyyy.")
    
    def count_followers(self,df):

        followers_count = DataExtractorIfood(spark=self.spark,user=self.user,token=self.token).get_count_followers()

        if df.count() == followers_count:
            return print("A quantidade de seguidores em conformidade a quantidade atual de seguidores no GitHub.")
        else:
            return print("A quantidade de seguidores não está igual")


    def execute_test(self):
        print("Resultado das verificações de teste no Dataframe.")
        df = self.read_data()
        self.clean_company(df=df)
        self.date_format(df=df)
        self.count_followers(df=df)
        

In [None]:
user = ''
token = ''
path = '/app/output/'


In [None]:
spark.conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
spark.conf.set("parquet.enable.summary-metadata", "false")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [None]:
extractor = DataExtractorIfood(spark=spark, user=user, token=token)
transformer = DataTransformerIfood()
loader = DataLoaderIfood(path)
test = DataTestIfood(spark=spark,path=path,user=user,token=token)

In [None]:
df_with_github_info = extractor.execute_extract_api()

df_transformed = transformer.transform(df_with_github_info)

loader.save_to_csv(df_transformed)

test.execute_test()

Resultado das verificações de teste no Dataframe.
Não existe '@' na coluna 'company'.
A coluna 'created_at' está com as datas no formato dd/mm/yyyy.
A quantidade de seguidores em conformidade a quantidade atual de seguidores no GitHub.


## Testes

In [None]:
from pyspark.sql.functions import  col, regexp_extract

In [None]:
df_test = spark.read.format("csv").options(header = True, multiLine=True).load(path)

In [None]:
if df_test.filter(col("company").like("%@%")).count() > 0:
    print("Existe pelo menos um '@' na coluna 'company'.")
else:
    print("Não existe '@' na coluna 'company'.")

In [None]:
pattern = r'^\d{2}/\d{2}/\d{4}$'
if df_test.filter(regexp_extract(col("created_at"), pattern, 0) != "").count() == df_test.count():
    print("A coluna 'created_at' está com as datas no formato dd/mm/yyyy.")
else:
    print("A coluna 'created_at' não está com todas as datas no formato dd/mm/yyyy.")

In [None]:
df_test.display()