In [1]:
from abc import ABC, abstractmethod
class Extract(ABC):
    
    @abstractmethod
    def extract(self):
        raise NotImplementedError("Subclass must implement abstract method") 
        
        
        
class ExtractFromOracle(Extract):
   
    def __init__(self,sqlContext,url,query_or_table,user,password):
        
        self.url=url
        self.query_or_table=query_or_table
        self.user=user
        self.password=password
        self.sqlContext=sqlContext
    
        
    def extract(self):
        result =self.sqlContext.read.format("jdbc").option("url",self.url).option("dbtable",self.query_or_table).option("user",self.user).option("password",self.password).load()
        
        return result
class ExtractFromFile(Extract):
    def __init__(self,s,sqlContext):
        self.sqlContext=sqlContext
        self.schema=s
        
       
    def extract(self):
        result=self.sqlContext.read.load(self.schema,
        format="csv", sep=",", inferSchema="true", header="true")
        
        return result
class ExtractFromJson(Extract):
    def __init__(self,lien):
        self.lien=lien
       
    def extract(self):
        result=self.spark.read.load(self.lien, format="json")
        return result


In [2]:
class Transform:
  
    
    def __init__(self,sqlContext,sparkContext):
        self.sqlContext=sqlContext
        self.sparkContext=sparkContext
    
    def CastColumns (self,dataFrame,listColumns):
        for c in listColumns:
            dataFrame=dataFrame.withColumn(c[0],dataFrame[c[0]].cast(c[1]))
        return dataFrame
   
    
    def RemplacerAllNan(self,df,d=0):
        df=df.fillna(d)
        return df
        
    def RemplacerAllNanDict(self,df,dictionnaire):
        df=df.fillna(dictionnaire)
        return df

    
class TransformCDR(Transform):
    
    
    def setTableCDR(self,dataFrame,date):
        cdr= self.sqlContext.createDataFrame(self.sparkContext.emptyRDD(),dataFrame.schema)
        df1=dataFrame.select(dataFrame.FROM_SUBSCRIBER_ID,dataFrame.TO_SUBSCRIBER_ID).where(dataFrame.CALL_DATE==date)
        df2=dataFrame.select(dataFrame.TO_SUBSCRIBER_ID,dataFrame.FROM_SUBSCRIBER_ID).where(dataFrame.CALL_DATE==date)
        df3=df1.intersect(df2)
        df3.registerTempTable("cdrSelected")
        dataFrame.where(dataFrame.CALL_DATE==date).registerTempTable("cdr")
        df_cdr=self.sqlContext.sql("select CALL_DATE,cdr.FROM_SUBSCRIBER_ID,cdr.TO_SUBSCRIBER_ID,CALLS,SMS,DURATION,CALLING_DAYS from cdr,cdrSelected where cdr.FROM_SUBSCRIBER_ID=cdrSelected.FROM_SUBSCRIBER_ID and cdr.TO_SUBSCRIBER_ID=cdrSelected.TO_SUBSCRIBER_ID")
        return df_cdr           
        
class TransformComportement(Transform):
    def setTableComportement(self,df1,df2,df3,df4,df5):
        
        d1=df1.select('CODE_CONTRAT','MONTH_DT','NB_APPEL','DUREE_APPEL','NB_APPEL_TT_GSM','DUREE_APPEL_TT_GSM','DUREE_APPEL_TT_FIXE','NB_APPEL_TT_FIXE',)
        d1 =d1.selectExpr("CODE_CONTRAT as CODE_CONTRAT", "NB_APPEL as NB_APPEL_out","DUREE_APPEL as DUREE_APPEL_out","NB_APPEL_TT_GSM as NB_APPEL_TT_GSM_out","DUREE_APPEL_TT_GSM as DUREE_APPEL_TT_GSM_out","DUREE_APPEL_TT_FIXE as DUREE_APPEL_TT_FIXE_out","NB_APPEL_TT_FIXE as NB_APPEL_TT_FIXE_out")
        d2=df2.select('CODE_CONTRAT','NB_APPEL','DUREE_APPEL','NB_APPEL_TT_GSM','DUREE_APPEL_TT_GSM','DUREE_APPEL_TT_FIXE','NB_APPEL_TT_FIXE',)
        d2=d2.selectExpr("CODE_CONTRAT as CODE_CONTRAT", "NB_APPEL as NB_APPEL_in","DUREE_APPEL as DUREE_APPEL_in","NB_APPEL_TT_GSM as NB_APPEL_TT_GSM_in","DUREE_APPEL_TT_GSM as DUREE_APPEL_TT_GSM_in","DUREE_APPEL_TT_FIXE as DUREE_APPEL_TT_FIXE_in","NB_APPEL_TT_FIXE as NB_APPEL_TT_FIXE_in")
        d3=df3.select('CODE_CONTRAT','ID_OFFRE','FLAG_3G','FLAG_4G','NB_CHANGEMENT_OFFRE','LAST_DATE_CHANGEMENT_OFFRE')
        d4=df4.select('CODE_CONTRAT','NB_JR_ACTIVITE_DATA','VOLUME_SESSION')
        d5=df5.select('CODE_CONTRAT','LAST_EVENT_DATE','DERNIERE_DATE_VOIX_SORTANT','DERNIERE_DATE_SMS_SORTANT','DERNIERE_DATE_DATA')
        
        result = d1.join(d2, on="CODE_CONTRAT").join(d3, on="CODE_CONTRAT").join(d4, on="CODE_CONTRAT").join(d5, on="CODE_CONTRAT")
        return result
    
            
        
        
            
        
            
            
        
    

In [3]:
class Load():
    
    def loadDataFrame(self,dataFrame,tableName,user,password,mode="OverWrite"):
        dataFrame.write.mode(mode).format("jdbc")\
         .option("url","jdbc:oracle:thin:@localhost:1522:xe")\
         .option("dbtable", tableName)\
         .option("user", user)\
         .option("password", password)\
         .option("truncate", "true")\
         .save()

In [4]:
class ETL():
    def __init__(self,sqlContext,sparkContext):
        self.sqlContext=sqlContext
        self.sparkContext=sparkContext
        self.DataFrames=[]
        e=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe",'(select "CALL_DATE","FROM_SUBSCRIBER_ID","TO_SUBSCRIBER_ID","CALLS","SMS","DURATION","CALLING_DAYS" from fact_dw_cla_monthly_trafic_msc where "A_NUMBER_NETWORK"=\'TT\' and "B_NUMBER_NETWORK"= \'TT\' and "DURATION" >5 )',"telecom","97908631")
        e1=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","FACT_USAGE_MONTHLY_SORTANT_B","telecom","97908631")
        e2=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","FACT_USAGE_MONTHLY_ENTRANT_B","telecom","97908631")
        e3=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","DIM_CONTRACT_D","telecom","97908631")
        e4=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","FACT_USAGE_MOUNTHLY_DATA_B","telecom","97908631")
        e5=ExtractFromOracle(self.sqlContext,"jdbc:oracle:thin:@localhost:1522:xe","DIM_PARC_RGS_D","telecom","97908631")
        self.Extracters=[e,e1,e2,e3,e4,e5]
        self.TransformCDR=TransformCDR(self.sqlContext,self.sparkContext)
        self.TransformComportement=TransformComportement(self.sqlContext,self.sparkContext)
        self.loder=Load()
    
       
    def job(self):
        
        

        for e in self.Extracters:
            
            self.DataFrames.append(e.extract())
    
    
        for i in range(len(self.DataFrames)):
            self.DataFrames[i]=self.DataFrames[i].repartition(10)

        for i in range(len(self.DataFrames)):
            self.DataFrames[i]=self.DataFrames[i].cache()
            
        listColumns=[('FROM_SUBSCRIBER_ID','int'),('TO_SUBSCRIBER_ID','int'),('SMS','int')]
        self.DataFrames[0]=self.TransformCDR.CastColumns(self.DataFrames[0],listColumns)
        df1=self.TransformCDR.setTableCDR(self.DataFrames[0],"30Jun2019 0:00:00")
       
        df2=self.TransformComportement.setTableComportement(self.DataFrames[1],self.DataFrames[2],self.DataFrames[3],self.DataFrames[4],self.DataFrames[5])
       
        
        self.loder.loadDataFrame(df1,"cdr","telecom","97908631")
       
        self.loder.loadDataFrame(df2,"comportement","telecom","97908631")
     

In [5]:
def main():
    from pyspark import SparkContext, SparkConf
    from pyspark import SQLContext
    
    spark_config = SparkConf().setMaster("local").setAppName('etl').set("spark.ui.port", "4050")\
    .set("spark.sql.crossJoin.enabled", "true")\
    .set("spark.sql.shuffle.partitions","1")
    sc = SparkContext(conf=spark_config) 
    sqlContext = SQLContext(sc) 
    
    
    etl=ETL(sqlContext,sc)
    etl.job()
    print("end")
    
    

In [6]:
main()

end
