In [1]:
import pandas as pd
pd.set_option("display.max_columns", 512)

In [2]:
from pyspark import StorageLevel
from pyspark.sql.functions import col, when, concat, year, month, lpad, lit, udf, hour, concat_ws, unix_timestamp
from pyspark.sql.types import LongType, StringType, DoubleType, TimestampType

In [3]:
import graphframes

In [4]:
spark.sparkContext.setCheckpointDir("hdfs:///user/jsotovi2/spark_checkpoints")

In [5]:
year = 2017
month = 9

## Listados de teléfonos que ya son clientes de vodafone (tanto prepago como pospago)

In [6]:
acFinalPrepago = (spark.read.table("raw_es.vf_pre_ac_final")
                  .where(col("year") == year)
                  .where(col("month") == month)
                  )

In [7]:
acFinalPospago = (spark.read.table("raw_es.vf_pos_ac_final")
                  .where(col("year") == year)
                  .where(col("month") == month)
                 )

In [8]:
assert acFinalPrepago.count() > 1000
assert acFinalPospago.count() > 1000

In [9]:
lookup_cliente = (acFinalPrepago
                  .dropDuplicates(subset=["msisdn"])
                  .withColumn("es_vodafone", lit("prepago"))
                  .select("msisdn", "es_vodafone")
                  .union(acFinalPospago
                         .dropDuplicates(subset=["x_id_red"])
                         .withColumn("es_vodafone", lit("pospago"))
                         .select(col("x_id_red").alias("msisdn"), col("es_vodafone"))
                        )
                 )

# CDRs

In [10]:
def remove_trailing_zeros_and_34(number):
    if number.startswith("0"):
        return remove_trailing_zeros_and_34(number[1:])
    elif number.startswith("33"): # Indeed, 33
        return remove_trailing_zeros_and_34(number[2:])
    else:
        return number
    
clean_number_udf = udf(remove_trailing_zeros_and_34, StringType())

In [11]:
cdrs_raw = (spark.read.table("raw_es.mediated_cdr_navajo")
            .where(col("year") == year)
            .where(col("month") == month)
            .na.drop(subset=["nrprima", "nrsecun"])
            )

cdrs_full = (spark.read.table("raw_es.mediated_cdr_navajo")
            .where(col("year") == year)
            .where(col("month") == month)
            .na.drop(subset=["nrprima", "nrsecun"])
            .withColumn("numeroorigen", clean_number_udf(col("nrprima")))
            .withColumn("numerodestino", clean_number_udf(col("nrsecun")))
            .withColumn("dateofcall", unix_timestamp(concat_ws(" ", col("dtinilla"), col("hrinilla")),
                                                     format="yyyyMMdd HHmmss").cast(TimestampType()))
            .withColumnRenamed("tmdurlla", "airduration")
            .select("numeroorigen", 
                     "numerodestino", 
                     "dateofcall", 
                     "airduration"
                    )
            .repartition(cdrs_raw.count() // 3000000)
            .dropDuplicates(subset=["numeroorigen", "numerodestino", "dateofcall"])
            #.limit(50000) # Remove!
            ).checkpoint(eager=False)

In [12]:
too_many_numeroorigen_calls = (cdrs_full.groupBy("numeroorigen")
                               .count()
                               .where(col("count") <= 10000)
                              )

too_many_numerodestino_calls = (cdrs_full.groupBy("numerodestino")
                                .count()
                                .where(col("count") <= 10000)
                                )

cdrs = (cdrs_full
        .join(too_many_numeroorigen_calls.select("numeroorigen"),
              how="inner",
              on="numeroorigen")
        .join(too_many_numerodestino_calls.select("numerodestino"),
              how="inner",
              on="numerodestino")
       )

In [13]:
origenes = (cdrs
            .select("numeroorigen")
           )
destinos = (cdrs
            .select(col("numerodestino"), 
                    #col("tolocation"), 
                    #col("destino")
                   )
           )
origenes_mas_completos = (origenes
                          .dropDuplicates(subset=["numeroorigen"])
                          #.join(destinos.dropDuplicates(subset=["numerodestino"]),
                          #      how="left",
                          #      on=origenes["numeroorigen"]==destinos["numerodestino"]
                          #     )
                          #.select("numeroorigen", "tolocation", "destino")
                         )


vertices = (origenes_mas_completos
            .dropDuplicates(subset=["numeroorigen"])
            .withColumnRenamed("numeroorigen","id")
            .union(destinos
                   .dropDuplicates(subset=["numerodestino"])
                   .withColumnRenamed("numerodestino", "id")
                  )
            .dropDuplicates(subset=["id"])
            .na.drop(subset=["id"])
           )

vertices_info_vf = (vertices.join(lookup_cliente,
                                 how="left",
                                 on=vertices["id"]==lookup_cliente["msisdn"])
                    .withColumn("es_vodafone", when(~(col("es_vodafone").isNull()), 
                                                    col("es_vodafone"))
                                               .otherwise("no")
                               )
                    .select("id", 
                            #"tolocation", 
                            #"destino", 
                            "es_vodafone")
                   ).repartition(300).checkpoint(eager=False)

edges = (cdrs
         .select("numeroorigen", 
                 "numerodestino", 
                 "dateofcall", 
                 "airduration"
                )
         .withColumnRenamed("numeroorigen", "src")
         .withColumnRenamed("numerodestino", "dst")
        ).repartition(300).checkpoint(eager=False)

grafo_completo = (graphframes.GraphFrame(vertices_info_vf, 
                                         edges
                                        )
                 )

In [14]:
hours = [("6AM_14PM", (6,14)),
         ("15PM_18PM",(15,18)),
         ("19PM_1AM", [(19,23),(0,1)]),
         ("2AM_5AM",  (2,5))]


in_out_dfs = []

for literal, time_slot in hours:
    try:
        edges_filtered_by_hour = (grafo_completo
                                  .edges
                                  .filter((hour(col("dateofcall")).between(time_slot[0][0], 
                                                                           time_slot[0][1]))
                                         | (hour(col("dateofcall")).between(time_slot[1][0], 
                                                                            time_slot[1][1]))
                                         )
                                  )
    except TypeError:
        edges_filtered_by_hour = (grafo_completo
                                  .edges
                                  .filter(hour(col("dateofcall")).between(time_slot[0], 
                                                                          time_slot[1]))
                                  )
        
    grafo_completo_filtered_by_hour = (graphframes.GraphFrame(grafo_completo.vertices,
                                                              edges_filtered_by_hour)
                                       .persist(StorageLevel.DISK_ONLY)
                                      )
    
    ##
    ## General graph statistics
    ##
    full_indegrees = (grafo_completo_filtered_by_hour
                     .inDegrees
                     .withColumnRenamed("inDegree","received_calls_"+literal)
                     .persist(StorageLevel.DISK_ONLY)
                     )
    
    full_outdegrees = (grafo_completo_filtered_by_hour
                       .outDegrees
                       .withColumnRenamed("outDegree","emited_calls_"+literal)
                       .persist(StorageLevel.DISK_ONLY)
                      )
    
    number_mins_rcv_calls = (grafo_completo_filtered_by_hour
                             .edges
                             .groupBy("dst").sum("airduration")
                             .withColumnRenamed("sum(airduration)", "n_mins_received_"+literal)
                             .withColumn("n_mins_received_"+literal, 
                                         col("n_mins_received_"+literal).cast(DoubleType()))
                            )
    
    number_mins_src_calls = (grafo_completo_filtered_by_hour
                             .edges
                             .groupBy("src").sum("airduration")
                             .withColumnRenamed("sum(airduration)", "n_mins_called_"+literal)
                             .withColumn("n_mins_called_"+literal, 
                                         col("n_mins_called_"+literal).cast(DoubleType())
                                        )
                            )
    
    ##
    ## VF-specific statistics
    ##   
    vodafone_types = ["prepago", "pospago"]
    
    vodafone_types_dfs = {"vf_received_call_ratio": [],
                          "vf_emited_call_ratio": [],
                          "ratio_mins_rcv_calls_vf": [],
                          "ratio_mins_src_calls_vf": []}
    
    for vf_type in vodafone_types:
        filtered_edges_entrada = (grafo_completo_filtered_by_hour
                                  .find("(a)-[e]->(b)")
                                  .filter("a.es_vodafone == '" + vf_type + "'")
                                  .select("e.src", "e.dst", "e.dateofcall", "e.airduration")
                                 )

        calls_from_vodafone = (graphframes.GraphFrame(grafo_completo.vertices, 
                                                      filtered_edges_entrada)
                              )
    
    
        vf_indegrees = (calls_from_vodafone
                        .inDegrees
                        .withColumnRenamed("inDegree", "received_vf_calls_"+vf_type+"_"+literal)
                       )

        vf_received_call_ratio = (full_indegrees.join(vf_indegrees,
                                                      how="left",
                                                      on="id")
                                 .withColumn("received_calls_"+literal, col("received_calls_"+literal).cast(DoubleType()))
                                 .withColumn("received_vf_calls_"+vf_type+"_"+literal, col("received_vf_calls_"+vf_type+"_"+literal).cast(DoubleType()))
                                 .withColumn("received_vf_calls_ratio_"+vf_type+"_"+literal, 
                                             col("received_vf_calls_"+vf_type+"_"+literal)
                                             / col("received_calls_"+literal)
                                            )
                                 .drop("received_calls_"+literal)
                                 .na.fill(0.0)
                                 )
    
    ##
        filtered_edges_salida = (grafo_completo_filtered_by_hour
                                 .find("(a)-[e]->(b)")
                                 .filter("b.es_vodafone == '" + vf_type + "'")
                                 .select("e.src", "e.dst", "e.dateofcall", "e.airduration")
                                )
    
        calls_to_vodafone = graphframes.GraphFrame(grafo_completo.vertices, 
                                                   filtered_edges_salida)
    
    
        vf_outdegrees = (calls_to_vodafone
                         .outDegrees
                         .withColumnRenamed("outDegree", "emited_vf_calls_"+vf_type+"_"+literal)
                        )
    
        vf_emited_call_ratio  = (full_outdegrees.join(vf_outdegrees,
                                                       how="left",
                                                       on="id")
                                 .withColumn("emited_calls_"+literal, col("emited_calls_"+literal).cast(DoubleType()))
                                 .withColumn("emited_vf_calls_"+vf_type+"_"+literal, col("emited_vf_calls_"+vf_type+"_"+literal).cast(DoubleType()))
                                 .withColumn("emited_vf_calls_ratio_"+vf_type+"_"+literal, 
                                             col("emited_vf_calls_"+vf_type+"_"+literal)
                                             / col("emited_calls_"+literal)
                                            )
                                 .drop("emited_calls_"+literal)
                                 .na.fill(0.0)
                                 )
    
    ##
    
        number_mins_rcv_calls_vf = (filtered_edges_entrada
                                    .groupBy("dst")
                                    .sum("airduration")
                                    .withColumnRenamed("sum(airduration)", "n_mins_received_vf_"+vf_type+"_"+literal)
                                    .withColumn("n_mins_received_vf_"+vf_type+"_"+literal,
                                                col("n_mins_received_vf_"+vf_type+"_"+literal).cast(DoubleType()))
                                   )

        ratio_mins_rcv_calls_vf = (number_mins_rcv_calls
                                   .join(number_mins_rcv_calls_vf,
                                         how="left",
                                         on="dst")
                                   .withColumn("ratio_mins_received_vf_"+vf_type+"_"+literal, 
                                               col("n_mins_received_vf_"+vf_type+"_"+literal)
                                               / col("n_mins_received_"+literal)
                                              )
                                   .drop("n_mins_received_"+literal)
                                   .na.fill(0.0)
                                  )
    
    ##
    
        number_mins_src_calls_vf = (filtered_edges_salida
                                    .groupBy("src")
                                    .sum("airduration")
                                    .withColumnRenamed("sum(airduration)", "n_mins_called_vf_"+vf_type+"_"+literal)
                                    .withColumn("n_mins_called_vf_"+vf_type+"_"+literal,
                                                col("n_mins_called_vf_"+vf_type+"_"+literal).cast(DoubleType())
                                               )
                                   )

        ratio_mins_src_calls_vf = (number_mins_src_calls
                                   .join(number_mins_src_calls_vf,
                                         how="left",
                                         on="src")
                                   .withColumn("ratio_mins_called_vf_"+vf_type+"_"+literal, 
                                               col("n_mins_called_vf_"+vf_type+"_"+literal)
                                               / col("n_mins_called_"+literal)
                                              )
                                   .drop("n_mins_called_"+literal)
                                   .na.fill(0.0)
                                  )
        
        # Final appends
        vodafone_types_dfs["vf_received_call_ratio"].append(vf_received_call_ratio)
        vodafone_types_dfs["vf_emited_call_ratio"].append(vf_emited_call_ratio)
        vodafone_types_dfs["ratio_mins_rcv_calls_vf"].append(ratio_mins_rcv_calls_vf)
        vodafone_types_dfs["ratio_mins_src_calls_vf"].append(ratio_mins_src_calls_vf)
    
    
    # JOINS
    full_vf_received_call_ratio = full_indegrees.join(vodafone_types_dfs["vf_received_call_ratio"][0],
                                                      how="left",
                                                      on="id")
    
    for df in vodafone_types_dfs["vf_received_call_ratio"][1:]:
        full_vf_received_call_ratio = (full_vf_received_call_ratio
                                       .join(df,
                                             how="left",
                                             on="id")
                                      )
    #    
    full_vf_emited_call_ratio = full_outdegrees.join(vodafone_types_dfs["vf_emited_call_ratio"][0],
                                                    how="left",
                                                    on="id")
    
    for df in vodafone_types_dfs["vf_emited_call_ratio"][1:]:
        full_vf_emited_call_ratio = (full_vf_emited_call_ratio
                                       .join(df,
                                             how="left",
                                             on="id")
                                      )
    #   
    full_ratio_mins_rcv_calls_vf = number_mins_rcv_calls.join(vodafone_types_dfs["ratio_mins_rcv_calls_vf"][0],
                                                              how="left",
                                                              on="dst")
    for df in vodafone_types_dfs["ratio_mins_rcv_calls_vf"][1:]:
        full_ratio_mins_rcv_calls_vf = (full_ratio_mins_rcv_calls_vf
                                       .join(df,
                                             how="left",
                                             on="dst")
                                      )
        
    full_ratio_mins_rcv_calls_vf = full_ratio_mins_rcv_calls_vf.withColumnRenamed("dst","id")
    
    #
    full_ratio_mins_src_calls_vf = number_mins_src_calls.join(vodafone_types_dfs["ratio_mins_src_calls_vf"][0],
                                                              how="left",
                                                              on="src")
    for df in vodafone_types_dfs["ratio_mins_src_calls_vf"][1:]:
        full_ratio_mins_src_calls_vf = (full_ratio_mins_src_calls_vf
                                       .join(df,
                                             how="left",
                                             on="src")
                                      )
        
    full_ratio_mins_src_calls_vf = full_ratio_mins_src_calls_vf.withColumnRenamed("src","id")
        
    
    ## JOINS
    in_out_features = (grafo_completo
                       .vertices
                       .select("id")
                       .join(full_vf_received_call_ratio,
                             how="left",
                             on="id")
                       .join(full_vf_emited_call_ratio,
                             how="left",
                             on="id")
                       .join(full_ratio_mins_rcv_calls_vf,
                             how="left",
                             on="id")
                       .join(full_ratio_mins_src_calls_vf,
                             how="left",
                             on="id")
                      )
    
    in_out_dfs.append((literal, in_out_features))

In [15]:
in_out_df = in_out_dfs[0][1]

for literal, df in in_out_dfs[1:]:
    in_out_df = (in_out_df
                 .join(df,
                       how="outer",
                       on="id")
                 .na.fill(0.0)
                )
    
in_out_df_cached = in_out_df.checkpoint(eager=False)

In [16]:
in_out_df_cached.count()

61131626

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 4100)

In [None]:
max_iter_label_propagation = 100

community_dfs = []

for literal, time_slot in hours:
    try:
        edges_filtered_by_hour = (grafo_completo
                                  .edges
                                  .filter((hour(col("dateofcall")).between(time_slot[0][0], 
                                                                           time_slot[0][1]))
                                         | (hour(col("dateofcall")).between(time_slot[1][0], 
                                                                            time_slot[1][1]))
                                         )
                                  .repartition(500, "src")
                                  )
        edges_filtered_by_hour.count()
        
    except TypeError:
        edges_filtered_by_hour = (grafo_completo
                                  .edges
                                  .filter(hour(col("dateofcall")).between(time_slot[0], 
                                                                          time_slot[1]))
                                  .repartition(500, "src")
                                  )
        edges_filtered_by_hour.count()

    grafo_completo_filtered_by_hour = (graphframes.GraphFrame(grafo_completo.vertices,
                                                              edges_filtered_by_hour)
                                       .persist(StorageLevel.DISK_ONLY)
                                      )
    
    ##
    ## General graph statistics
    ##
    label_propagation_by_hour = (grafo_completo_filtered_by_hour
                                 .labelPropagation(maxIter=max_iter_label_propagation)
                                 .checkpoint()
                                )
    
    
    
    label_propagation_by_label = (label_propagation_by_hour
                                  .groupBy("label")
                                  .count()
                                  .withColumn("count", col("count").cast(DoubleType()))
                                  .withColumnRenamed("count", "num_total_users_in_community_"+literal)
                                  .checkpoint()
                                  )
    
    label_propagation_by_hour_join_src = (label_propagation_by_hour
                                          .select("id","label")
                                          .withColumnRenamed("id", "id_src")
                                          .withColumnRenamed("label", "label_src")
                                          )

    label_propagation_by_hour_join_dst = (label_propagation_by_hour
                                          .select("id","label")
                                          .withColumnRenamed("id", "id_dst")
                                          .withColumnRenamed("label", "label_dst")
                                          )

    grafo_completo_edges = grafo_completo_filtered_by_hour.edges

    grafo_completo_edges_j_label_src = (grafo_completo_edges
                                        .join(label_propagation_by_hour_join_src,
                                              how="left",
                                              on=grafo_completo_edges["src"]==
                                              label_propagation_by_hour_join_src["id_src"])
                                       )
     
    grafo_completo_edges_j_label_src_j_dst = (grafo_completo_edges_j_label_src
                                              .join(label_propagation_by_hour_join_dst,
                                                    how="left",
                                                    on=grafo_completo_edges_j_label_src["dst"]==
                                                    label_propagation_by_hour_join_dst["id_dst"])
                                              )

    edges_intracommunity = (grafo_completo_edges_j_label_src_j_dst
                            .where(col("label_src")==col("label_dst"))
                            .withColumnRenamed("label_src", "community_id")
                            .drop("label_dst", "id_src", "id_dst")
                            .checkpoint()
                            )
    
    edges_intracommunity_num_calls = (edges_intracommunity
                                      .groupBy("community_id")
                                      .count()
                                      .withColumn("count", col("count").cast(DoubleType()))
                                      .withColumnRenamed("count", "num_total_calls_in_community_"+literal)
                                      )
    
    edges_intracommunity_minute_calls = (edges_intracommunity
                                         .groupBy("community_id")
                                         .sum("airduration")
                                         .withColumn("sum_airduration", col("sum(airduration)").cast(DoubleType()))
                                         .drop("sum(airduration)")
                                         .withColumnRenamed("sum_airduration", "num_total_minutes_in_community_"+literal)
                                      )
    
    calls_made_to_community_members = (edges_intracommunity
                                       .groupBy("src")
                                       .count()
                                       .withColumn("count", col("count").cast(DoubleType()))
                                       .withColumnRenamed("src","id")
                                       .withColumnRenamed("count","num_calls_made_to_community_members_"+literal)
                                      )
    
    calls_received_from_community_members = (edges_intracommunity
                                             .groupBy("dst")
                                             .count()
                                             .withColumn("count", col("count").cast(DoubleType()))
                                             .withColumnRenamed("dst","id")
                                             .withColumnRenamed("count","num_calls_received_from_community_members_"+literal)
                                             )
    
    minut_made_to_community_members = (edges_intracommunity
                                       .groupBy("src")
                                       .sum("airduration")
                                       .withColumn("sum_airduration", col("sum(airduration)").cast(DoubleType()))
                                       .drop("sum(airduration)")
                                       .withColumnRenamed("src","id")
                                       .withColumnRenamed("sum_airduration","minutes_calls_made_to_community_members_"+literal)
                                      )
    
    minut_recv_from_community_members= (edges_intracommunity
                                       .groupBy("dst")
                                       .sum("airduration")
                                       .withColumn("sum_airduration", col("sum(airduration)").cast(DoubleType()))
                                       .drop("sum(airduration)")
                                       .withColumnRenamed("dst","id")
                                       .withColumnRenamed("sum_airduration","minutes_calls_received_from_community_members_"+literal)
                                      )
    
    
    ##
    ## VF-specific statistics
    ##   
    vodafone_types = ["prepago", "pospago"]
    
    vodafone_types_dfs = {"label_propagation_ratio": [],
                          "edges_intracommunity_num_calls_ratio": [],
                          "edges_intracommunity_minute_calls_ratio": [],
                          "ratio_calls_made_to_vf_community_members": [],
                          "ratio_calls_received_from_vf_community_members":[],
                          "ratio_minut_made_to_vf_community_members": [],
                          "ratio_minut_recv_from_vf_community_members":[]
                         }
    
    for vf_type in vodafone_types:
    ###########################
    ## VF users in community ##
    ###########################
    
        label_propagation_onlyvf_by_label = (label_propagation_by_hour
                                            .where(col("es_vodafone")==vf_type)
                                            .groupBy("label")
                                            .count()
                                            .withColumn("count", col("count").cast(DoubleType()))
                                            .withColumnRenamed("count", "num_vf_users_in_community_"+vf_type+"_"+literal)
                                            )
        
        label_propagation_ratio = (label_propagation_by_label
                                   .join(label_propagation_onlyvf_by_label,
                                         how="left",
                                         on="label")
                                   .withColumn("ratio_vf_users_in_community_"+vf_type+"_"+literal, 
                                               col("num_vf_users_in_community_"+vf_type+"_"+literal)
                                               / col("num_total_users_in_community_"+literal))
                                   .withColumnRenamed("label", "community_id")
                                   .drop("num_total_users_in_community_"+literal)
                                   .na.fill(0.0)
                                   )
    
    #########################################################
    ## Calls in community that are from VF user to VF user ##
    #########################################################

        edges_intracommunity_only_vf_to_vf = (graphframes.GraphFrame(grafo_completo.vertices,
                                                                     edges_intracommunity)
                                              .find("(a)-[e]->(b)")
                                              .filter("a.es_vodafone == '" + vf_type + "'")
                                              .filter("b.es_vodafone == '" + vf_type + "'")
                                              .select("e.src", "e.dst", "e.dateofcall", "e.airduration", "e.community_id")
                                              .checkpoint()
                                              )

        edges_intracommunity_only_vf_to_vf_num_calls = (edges_intracommunity_only_vf_to_vf
                                                        .groupBy("community_id")
                                                        .count()
                                                        .withColumn("count", col("count").cast(DoubleType()))
                                                        .withColumnRenamed("count", "num_vf_to_vf_calls_in_community_"+vf_type+"_"+literal)
                                                        )

        edges_intracommunity_num_calls_ratio = (edges_intracommunity_num_calls
                                               .join(edges_intracommunity_only_vf_to_vf_num_calls,
                                                     how="left",
                                                     on="community_id")
                                               .withColumn("ratio_vf_to_vf_calls_in_community_"+vf_type+"_"+literal, 
                                                           col("num_vf_to_vf_calls_in_community_"+vf_type+"_"+literal)
                                                           / col("num_total_calls_in_community_"+literal)
                                                          )
                                               .drop("num_total_calls_in_community_"+literal)
                                               .na.fill(0.0)
                                               )
    
    ###########################################################
    ## Minutes in community that are from VF user to VF user ##
    ###########################################################
    
        edges_intracommunity_only_vf_to_vf_minute_calls = (edges_intracommunity_only_vf_to_vf
                                                           .groupBy("community_id")
                                                           .sum("airduration")
                                                           .withColumn("sum_airduration", col("sum(airduration)").cast(DoubleType()))
                                                           .drop("sum(airduration)")
                                                           .withColumnRenamed("sum_airduration", "num_vf_to_vf_minutes_in_community_"+vf_type+"_"+literal)
                                                           )
    
        edges_intracommunity_minute_calls_ratio = (edges_intracommunity_minute_calls
                                                   .join(edges_intracommunity_only_vf_to_vf_minute_calls,
                                                     how="left",
                                                     on="community_id")
                                               .withColumn("ratio_vf_to_vf_minutes_in_community_"+vf_type+"_"+literal, 
                                                           col("num_vf_to_vf_minutes_in_community_"+vf_type+"_"+literal)
                                                           / col("num_total_minutes_in_community_"+literal)
                                                          )
                                               .drop("num_total_minutes_in_community_"+literal)
                                               .na.fill(0.0)
                                               )
    
    ################################################################
    ## From the calls that the user makes/receive with communnity ##
    ##  members, how many are vodafone                            ##
    ################################################################
        
        calls_made_to_vf_community_members = (graphframes.GraphFrame(grafo_completo.vertices,
                                                                     edges_intracommunity)
                                              .find("(a)-[e]->(b)")
                                              .filter("b.es_vodafone == '" + vf_type + "'")
                                              .select("e.src", "e.dst", "e.dateofcall", "e.airduration", "e.community_id")
                                              .groupBy("src")
                                              .count()
                                              .withColumn("count", col("count").cast(DoubleType()))
                                              .withColumnRenamed("src","id")
                                              .withColumnRenamed("count","num_calls_made_to_vf_community_members_"+vf_type+"_"+literal)
                                              )
    
        ratio_calls_made_to_vf_community_members = (grafo_completo.vertices
                                                    .select("id")
                                                    .join(calls_made_to_community_members,
                                                          how="left",
                                                          on="id")
                                                    .join(calls_made_to_vf_community_members,
                                                          how="left",
                                                          on="id")
                                                    .withColumn("ratio_calls_made_to_vf_community_members_"+vf_type+"_"+literal, 
                                                           col("num_calls_made_to_vf_community_members_"+vf_type+"_"+literal)
                                                           / col("num_calls_made_to_community_members_"+literal)
                                                          )
                                                    .drop("num_calls_made_to_community_members_"+literal)
                                                    .na.fill(0.0)
                                                   )
    
        
        calls_received_from_vf_community_members = (graphframes.GraphFrame(grafo_completo.vertices,
                                                                           edges_intracommunity)
                                                    .find("(a)-[e]->(b)")
                                                    .filter("a.es_vodafone == '" + vf_type + "'")
                                                    .select("e.src", "e.dst", "e.dateofcall", "e.airduration", "e.community_id")
                                                    .groupBy("dst")
                                                    .count()
                                                    .withColumn("count", col("count").cast(DoubleType()))
                                                    .withColumnRenamed("dst","id")
                                                    .withColumnRenamed("count","num_calls_received_from_vf_community_members_"+vf_type+"_"+literal)
                                                    )
    
        ratio_calls_received_from_vf_community_members = (grafo_completo.vertices
                                                          .select("id")
                                                          .join(calls_received_from_community_members,
                                                                how="left",
                                                                on="id")
                                                          .join(calls_received_from_vf_community_members,
                                                                how="left",
                                                                on="id")
                                                          .withColumn("ratio_calls_received_from_vf_community_members_"+vf_type+"_"+literal, 
                                                                      col("num_calls_received_from_vf_community_members_"+vf_type+"_"+literal)
                                                                      / col("num_calls_received_from_community_members_"+literal)
                                                                     )
                                                          .drop("num_calls_received_from_community_members_"+literal)
                                                          .na.fill(0.0)
                                                          )
    
    
    ##################################################################
    ## From the minutes that the user makes/receive with communnity ##
    ##  members, how many are vodafone                              ##
    ##################################################################
        
        minut_made_to_vf_community_members = (graphframes.GraphFrame(grafo_completo.vertices,
                                                                     edges_intracommunity)
                                              .find("(a)-[e]->(b)")
                                              .filter("b.es_vodafone == '" + vf_type + "'")
                                              .select("e.src", "e.dst", "e.dateofcall", "e.airduration", "e.community_id")
                                              .groupBy("src")
                                              .sum("airduration")
                                              .withColumn("sum_airduration", col("sum(airduration)").cast(DoubleType()))
                                              .drop("sum(airduration)")
                                              .withColumnRenamed("src","id")
                                              .withColumnRenamed("sum_airduration","minutes_calls_made_to_vf_community_members_"+vf_type+"_"+literal)
                                              )
    
        ratio_minut_made_to_vf_community_members = (grafo_completo.vertices
                                                    .select("id")
                                                    .join(minut_made_to_community_members,
                                                          how="left",
                                                          on="id")
                                                    .join(minut_made_to_vf_community_members,
                                                          how="left",
                                                          on="id")
                                                    .withColumn("ratio_minutes_calls_made_to_vf_community_members_"+vf_type+"_"+literal, 
                                                           col("minutes_calls_made_to_vf_community_members_"+vf_type+"_"+literal)
                                                           / col("minutes_calls_made_to_community_members_"+literal)
                                                          )
                                                    .drop("minutes_calls_made_to_community_members_"+literal)
                                                    .na.fill(0.0)
                                                   )
    
        
        minut_recv_from_vf_community_members = (graphframes.GraphFrame(grafo_completo.vertices,
                                                                     edges_intracommunity)
                                              .find("(a)-[e]->(b)")
                                              .filter("a.es_vodafone == '" + vf_type + "'")
                                              .select("e.src", "e.dst", "e.dateofcall", "e.airduration", "e.community_id")
                                              .groupBy("dst")
                                              .sum("airduration")
                                              .withColumn("sum_airduration", col("sum(airduration)").cast(DoubleType()))
                                              .drop("sum(airduration)")
                                              .withColumnRenamed("dst","id")
                                              .withColumnRenamed("sum_airduration","minutes_calls_received_from_vf_community_members_"+vf_type+"_"+literal)
                                              )
    
        ratio_minut_recv_from_vf_community_members= (grafo_completo.vertices
                                                    .select("id")
                                                    .join(minut_recv_from_community_members,
                                                          how="left",
                                                          on="id")
                                                    .join(minut_recv_from_vf_community_members,
                                                          how="left",
                                                          on="id")
                                                    .withColumn("ratio_minutes_calls_received_from_vf_community_members_"+vf_type+"_"+literal, 
                                                           col("minutes_calls_received_from_vf_community_members_"+vf_type+"_"+literal)
                                                           / col("minutes_calls_received_from_community_members_"+literal)
                                                          )
                                                     .drop("minutes_calls_received_from_community_members_"+literal)
                                                    .na.fill(0.0)
                                                   )
        
        # Final appends
        vodafone_types_dfs["label_propagation_ratio"].append(label_propagation_ratio)
        vodafone_types_dfs["edges_intracommunity_num_calls_ratio"].append(edges_intracommunity_num_calls_ratio)
        vodafone_types_dfs["edges_intracommunity_minute_calls_ratio"].append(edges_intracommunity_minute_calls_ratio)
        vodafone_types_dfs["ratio_calls_made_to_vf_community_members"].append(ratio_calls_made_to_vf_community_members)
        vodafone_types_dfs["ratio_calls_received_from_vf_community_members"].append(ratio_calls_received_from_vf_community_members)
        vodafone_types_dfs["ratio_minut_made_to_vf_community_members"].append(ratio_minut_made_to_vf_community_members)
        vodafone_types_dfs["ratio_minut_recv_from_vf_community_members"].append(ratio_minut_recv_from_vf_community_members)
    
    
    # JOINs
    full_label_propagation_ratio = (label_propagation_by_label
                                    .withColumnRenamed("label","community_id")
                                    .join(vodafone_types_dfs["label_propagation_ratio"][0],
                                                                   how="left",
                                                                   on="community_id")
                                   )
    
    for df in vodafone_types_dfs["label_propagation_ratio"][1:]:
        full_label_propagation_ratio = (full_label_propagation_ratio
                                       .join(df,
                                             how="left",
                                             on="community_id")
                                      )
    
    #
    full_edges_intracommunity_num_calls_ratio = edges_intracommunity_num_calls.join(vodafone_types_dfs["edges_intracommunity_num_calls_ratio"][0],
                                                                                    how="left",
                                                                                    on="community_id")
    
    for df in vodafone_types_dfs["edges_intracommunity_num_calls_ratio"][1:]:
        full_edges_intracommunity_num_calls_ratio = (full_edges_intracommunity_num_calls_ratio
                                                     .join(df,
                                                           how="left",
                                                           on="community_id")
                                                    )   
    
    #
    full_edges_intracommunity_minute_calls_ratio = edges_intracommunity_minute_calls.join(vodafone_types_dfs["edges_intracommunity_minute_calls_ratio"][0],
                                                                                          how="left",
                                                                                          on="community_id")
    
    for df in vodafone_types_dfs["edges_intracommunity_minute_calls_ratio"][1:]:
        full_edges_intracommunity_minute_calls_ratio = (full_edges_intracommunity_minute_calls_ratio
                                                        .join(df,
                                                              how="left",
                                                              on="community_id")
                                                       ) 
        
    #
    full_ratio_calls_made_to_vf_community_members = (grafo_completo.vertices
                                                    .select("id")
                                                    .join(calls_made_to_community_members,
                                                          how="left",
                                                          on="id")
                                                     .join(vodafone_types_dfs["ratio_calls_made_to_vf_community_members"][0],
                                                           how="left",
                                                           on="id")
                                                    )
    
    for df in vodafone_types_dfs["ratio_calls_made_to_vf_community_members"][1:]:
        full_ratio_calls_made_to_vf_community_members = (full_ratio_calls_made_to_vf_community_members
                                                        .join(df,
                                                              how="left",
                                                              on="id")
                                                       )
        
    #
    full_ratio_calls_received_from_vf_community_members = (grafo_completo.vertices
                                                          .select("id")
                                                          .join(calls_received_from_community_members,
                                                                how="left",
                                                                on="id")
                                                           .join(vodafone_types_dfs["ratio_calls_received_from_vf_community_members"][0],
                                                                 how="left",
                                                                 on="id")
                                                    )
    
    for df in vodafone_types_dfs["ratio_calls_received_from_vf_community_members"][1:]:
        full_ratio_calls_received_from_vf_community_members = (full_ratio_calls_received_from_vf_community_members
                                                               .join(df,
                                                                     how="left",
                                                                     on="id")
                                                              )
        
    #
    full_ratio_minut_made_to_vf_community_members = (grafo_completo.vertices
                                                    .select("id")
                                                    .join(minut_made_to_community_members,
                                                          how="left",
                                                          on="id")
                                                     .join(vodafone_types_dfs["ratio_minut_made_to_vf_community_members"][0],
                                                           how="left",
                                                           on="id")
                                                    )
    
    for df in vodafone_types_dfs["ratio_minut_made_to_vf_community_members"][1:]:
        full_ratio_minut_made_to_vf_community_members = (full_ratio_minut_made_to_vf_community_members
                                                               .join(df,
                                                                     how="left",
                                                                     on="id")
                                                              )
        
    #
    full_ratio_minut_recv_from_vf_community_members = (grafo_completo.vertices
                                                       .select("id")
                                                       .join(minut_recv_from_community_members,
                                                             how="left",
                                                             on="id")
                                                       .join(vodafone_types_dfs["ratio_minut_recv_from_vf_community_members"][0],
                                                             how="left",
                                                             on="id")
                                                       )
    
    for df in vodafone_types_dfs["ratio_minut_recv_from_vf_community_members"][1:]:
        full_ratio_minut_recv_from_vf_community_members = (full_ratio_minut_recv_from_vf_community_members
                                                           .join(df,
                                                                 how="left",
                                                                 on="id")
                                                          )
    
    #################################
    ## Join all community features ##
    #################################
    
    # Community-only features (this is, features that will
    # have common values for all users in that community):
    community_features = (full_label_propagation_ratio
                          .join(full_edges_intracommunity_num_calls_ratio,
                                how="left",
                                on="community_id")
                          .join(full_edges_intracommunity_minute_calls_ratio,
                                how="left",
                                on="community_id")
                          .withColumnRenamed("community_id", "community_id_"+literal)
                          .na.fill(0.0)
                         )
    
    # Features that are specific for earch user:
    community_features_with_id = (label_propagation_by_hour
                                  .select("id","label")
                                  .withColumnRenamed("label","community_id_"+literal)
                                  .join(community_features,
                                        how="left",
                                        on="community_id_"+literal)
                                  .join(full_ratio_calls_made_to_vf_community_members,
                                        how="left",
                                        on="id")
                                  .join(full_ratio_calls_received_from_vf_community_members,
                                        how="left",
                                        on="id")
                                  .join(full_ratio_minut_made_to_vf_community_members,
                                        how="left",
                                        on="id")
                                  .join(full_ratio_minut_recv_from_vf_community_members,
                                        how="left",
                                        on="id")
                                 ).repartition(2100).persist(StorageLevel.DISK_ONLY)
    
    community_features_with_id.count()
    community_dfs.append((literal, community_features_with_id))

In [None]:
community_df = community_dfs[0][1]

for literal, df in community_dfs[1:]:
    community_df = (community_df
                    .join(df,
                          how="outer",
                          on="id")
                    .na.fill(0.0)
                   )
    
community_df_cached = community_df.checkpoint(eager=False)

In [None]:
community_df_cached.count()

In [None]:
feature_set = (grafo_completo
               .vertices
               .join(in_out_df_cached,
                     how="left",
                     on="id")
               .join(community_df_cached,
                     how="left",
                     on="id")
              )

In [None]:
feature_set.write.mode("overwrite").saveAsTable("tests_es.cdr_graph_features_201709_lp100")

In [None]:
print("Finished!")

In [None]:
feature_set.printSchema()