# Práctica 3 - Ejercicio 7

Dado el dataset Genealogía el cual está formado por: 

`<nombre_individuo, dni_individuo, dni_mamá>`

realice distintas funciones que:

1. Dado los dni de dos individuos indicar si son primos (dos individuos son primos si tienen la misma abuela)
2. Dado los dni de dos individuos i 1 y i 2 indicar si i 1 es ancestro de i 2 .
3. El nombre de la “abuela” que tiene más descendientes 
4. Los nombres de los hermanos de la familia más numerosa (la cantidad de integrantes de una familia se calcula como la cantidad de hermanos más la mamá). Podría existir más de una familia más numerosa, en cuyo caso se deben imprimir todos los nombres de los hermanos integrantes de cada familia.


In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import col

sc = SparkContext("local", "My program")
sqlContext = SQLContext(sc)

In [2]:
genealogia = sc.textFile("datasets/genealogia.txt")
genealogia = genealogia.map(lambda t : t.split("\t"))

genealogia = genealogia.map(lambda t: 
                               Row(
                                   nombre_individuo = str(t[0]),
                                   dni_individuo = int(t[1]),
                                   dni_mama = int(t[2]) if t[2] != 'None' else None 
                               )
                           )

genealogiaDF = sqlContext.createDataFrame(genealogia)
genealogiaDF.registerTempTable("Genealogia")

### a) Dado los dni de dos individuos indicar si son primos (dos individuos son primos si tienen la misma abuela).

In [4]:
dni1 = 646
dni2 = 223

def getAbuela(dni):
    query = f"""
    SELECT g.nombre_individuo, g.dni_individuo, abuela.nombre_individuo as nombre_abuela, abuela.dni_individuo as dni_abuela
    FROM Genealogia g
    INNER JOIN Genealogia madre ON g.dni_mama = madre.dni_individuo
    INNER JOIN Genealogia abuela ON madre.dni_mama = abuela.dni_individuo
    WHERE g.dni_individuo = '{ dni }';"""
    result = sqlContext.sql(query)
    return(result.select('dni_abuela').first()[0])

son_primos = getAbuela(dni1) == getAbuela(dni2)
print(f"{dni1} es primo de {dni2}: " + str(son_primos))

646 es primo de 223: True


### b) Dado los dni de dos individuos i1 y i2 indicar si i1 es ancestro de i2.

In [5]:
query = f"""
SELECT g.nombre_individuo, g.dni_individuo, abuela.nombre_individuo as nombre_abuela, abuela.dni_individuo as dni_abuela
FROM Genealogia g
INNER JOIN Genealogia madre ON g.dni_mama = madre.dni_individuo
INNER JOIN Genealogia abuela ON madre.dni_mama = abuela.dni_individuo"""

dni1 = 2184
dni2 = 646

def getMadre(id):
    query = f"""
    SELECT g.dni_mama
    FROM Genealogia g
    WHERE g.dni_individuo = '{ id }';"""
    result1 = sqlContext.sql(query)
    return result1.first()[0]


def getAncestros(id):
    ancestros = []
    madre = id
    while madre is not None:
        madre = getMadre(madre)
        if madre is not None:
            ancestros.append(madre)
    return ancestros


ancestros = getAncestros(dni2)
print(f"{dni1} es ancestro de {dni2}: " + str(dni1 in ancestros))

2184 es ancestro de 646: True


### c) El nombre de la “abuela” que tiene más descendientes

In [6]:
def getAbuelasConMasNietos():
    query = f"""
    SELECT abuela.nombre_individuo, abuela.dni_individuo, COUNT(*) as count
    FROM Genealogia g
    INNER JOIN Genealogia madre ON g.dni_mama = madre.dni_individuo 
    INNER JOIN Genealogia abuela ON abuela.dni_individuo = madre.dni_mama
    GROUP BY abuela.nombre_individuo, abuela.dni_individuo
    ORDER BY count DESC;"""
    result = sqlContext.sql(query)
    max = result.first()[2]
    print(f"count: {result.count()}")
    print(f"Cantidad máxima de nietos: {max}")
    result = result.filter(col("count") == max)
    return result.show()

getAbuelasConMasNietos()

count: 915
Cantidad máxima de nietos: 9
+----------------+-------------+-----+
|nombre_individuo|dni_individuo|count|
+----------------+-------------+-----+
|         Czvqfzh|         1046|    9|
|          Tsdzfx|          960|    9|
|        Darpfeqd|         2785|    9|
|         Symqeqf|         4460|    9|
|          Khqrpf|          572|    9|
+----------------+-------------+-----+



In [7]:
def getAbuelasConMasDescendientes():
    query = f"""
    SELECT g.nombre_individuo, g.dni_individuo
    FROM Genealogia g
    WHERE g.dni_mama is null;"""
    max = (0,0)
    result = sqlContext.sql(query)
    abuelas_array = [int(row.dni_individuo) for row in result.collect()]
    print(f"Los raices del arbol genealógico es {abuelas_array}")
    for abuela in abuelas_array:
        count = getDescendientes(abuela)
        print(f"{abuela} tiene {count} descendientes")
        if (max[1] < count):
            max = [abuela, count]
    print(f"La abuela con mas descendientes es {max[0]} con {max[1]}")
    return max

def getHijos(id):
    query = f"""
    SELECT g.dni_individuo
    FROM Genealogia g
    WHERE g.dni_mama = '{ id }';"""
    result = sqlContext.sql(query)
    hijos_array = [int(row.dni_individuo) for row in result.collect()]
    return hijos_array;

def getDescendientes(id, c=0):
    hijos = getHijos(id)
    count = c + len(hijos)
    if (len(hijos)>0):
        for h in hijos:
            count = getDescendientes(h,count)

    return count

getAbuelasConMasDescendientes()


Los raices del arbol genealógico es [1, 4113, 1536, 1945, 4786, 979, 310, 1271, 4295, 2085, 1229]
1 tiene 8 descendientes
4113 tiene 770 descendientes
1536 tiene 0 descendientes
1945 tiene 0 descendientes
4786 tiene 496 descendientes
979 tiene 577 descendientes
310 tiene 0 descendientes
1271 tiene 647 descendientes
4295 tiene 0 descendientes
2085 tiene 361 descendientes
1229 tiene 147 descendientes
La abuela con mas descendientes es 4113 con 770


[4113, 770]

### d) Los nombres de los hermanos de la familia más numerosa (la cantidad de integrantes de una familia se calcula como la cantidad de hermanos más la mamá). 
Podría existir más de una familia más numerosa, en cuyo caso se deben imprimir todos los nombres de los hermanos integrantes de cada familia

In [8]:
def getMadres():
    query = f"""
    SELECT g.dni_mama, COUNT(*) as count
    FROM Genealogia g
    WHERE g.dni_mama is not null
    GROUP BY g.dni_mama 
    ORDER BY count DESC"""
    result = sqlContext.sql(query)
    return result

madres = getMadres()
max = madres.first()[1]
madres = madres.filter(col("count") == max)

nombres = genealogiaDF.join(madres, madres.dni_mama == genealogiaDF.dni_mama).select("nombre_individuo", "g.dni_mama")
print(nombres.show())

+----------------+--------+
|nombre_individuo|dni_mama|
+----------------+--------+
|        Darpfeqd|     474|
|         Sqvbmee|     474|
|           Dybre|     474|
|          Okwyus|    2529|
|        Zpuydcfi|    2529|
|         Wfqnfxt|    2529|
|          Glyots|    4590|
|           Hnflm|    4590|
|           Zzkoy|    4590|
|           Pbsdw|     418|
|         Uyufhmp|     418|
|        Pbboyvlb|     418|
|          Kzvxtq|     541|
|          Pjqcvc|     541|
|         Czvqfzh|     541|
|        Ashuevwd|     222|
|        Iespugow|     222|
|          Ovldft|     222|
|        Yjrsixqd|    1371|
|           Lbhox|    1371|
+----------------+--------+
only showing top 20 rows

None
