In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

In [2]:
# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext
sqlContext=SQLContext(sc)

### Considerando el pagelink_sample.csv, usando una representación de grafos realizar una función genérica que nos permita calcular los contenidos que se encuentran a un grado de separación de cualquier identificador de contenido de la wikipedia. Mostrar el funcionamiento de la implementación con algún contenido incluido en el set de datos

Cargo solo las columnas necesarias del CSV pagelinks_sample y lo cargo con pandas a un DF

In [None]:
columns_pagelinks_1=['pl_from','pl_title']
df_pagelinks = pd.read_csv('pagelinks_sample.csv',usecols=columns_pagelinks_1,encoding='Latin-1')

In [None]:
df_pagelinks

Cargo por pandas el DF de contents y paso los titulos a una lista y filtro los pagelinks que estan en el DF solo si están en la lista de titulos para reducir su tamaño

In [5]:
df_contents = pd.read_csv('contents.csv',encoding='Latin-1')

In [6]:
title_list = df_contents['title'].tolist()

In [7]:
df_pagelinks_filter = df_pagelinks[df_pagelinks.pl_title.isin(title_list)]

In [8]:
df_pagelinks_filter

Unnamed: 0,pl_from,pl_title
2,6418187,Insecta
8,6571966,IMDb
12,5461523,Vandalismo
13,1098340,Wikidata
16,6224910,Referencias
...,...,...
79999986,89815,Usuarios
79999988,823826,Phylloderma
79999991,1339218,1980
79999993,5156930,1106


Se samplea el DF con frac=0.05 como se hablo con Gian en Slack por los problems de performance que tiene mi PC al terminar de ejecutar el final del ejercicio por lo grande que es el archivo

In [9]:
df_pagelinks_filter = df_pagelinks_filter.sample(frac=0.05)
df_pagelinks_filter.to_parquet('pagelinks.parquet')


Una vez creado el archivo .parquet, se reinicia el KERNEL

#### NOTA:
se vuelve a cargar el df contents porque se reinició el Kernel y para realizarle un sampleo para poder terminar con el ejercicio por los problemas de performance quetiene mi PC al momento de querer ejecutar los take luego del JOIN para verificar que todo vaya bien. Se guarda en un nuevo archivo .parquet

In [3]:
df_contents = pd.read_csv('contents.csv',encoding='Latin-1')
df_contents = df_contents.sample(frac=0.1)
df_contents.to_parquet('contents_filtred')

In [5]:
df_pagelinks_filter_p = sqlContext.read.parquet('pagelinks.parquet')
rdd_pagelinks = df_pagelinks_filter_p.rdd.cache()
df_contents = sqlContext.read.parquet('contents_filtred.parquet')
rdd_contents = df_contents.rdd.cache()

Se eliminan los DF para liberar memoria

In [6]:
del [[df_contents, df_pagelinks_filter_p]]

In [7]:
rdd_pagelinks.count()

1028647

Hago un map de los contenidos de la forma titulo como clave y id como valor

In [8]:
rdd_contents_map = rdd_contents.map(lambda x: (x[0],x[1]))

In [9]:
rdd_contents_map.take(10)

[('Bulnes (subte de Buenos Aires)', 467496),
 ('Glycine sarmentosa', 6877663),
 ('Ibn Bassal', 4186592),
 ('Partido Constitucionalista', 4692719),
 ('Al Shamal', 3683654),
 ('CategorÃ\xada:Traductores del Reino Unido', 1359497),
 ('Sabugalita', 5314907),
 ('Eichmann en JerusalÃ©n', 5150037),
 ('Crisis econÃ³mica de Venezuela de 2009-2010', 7850220),
 ('Loto (arbol)', 5763209)]

Hago un map con el RDD de pagelinks de la forma titulo del contenido hacia donde va el link como clave y el id de partida como clave. Esto es para poder hacer un Join sobre la clave titulo yque luego me queden como valores el id de salida y el id de llegada que lo traigo del RDD de contenidos

In [10]:
rdd_pagelinks_map = rdd_pagelinks.map(lambda x: (x[1],x[0]))

In [11]:
rdd_pagelinks_map.take(10)

[('Milestones', 6010413),
 ('Casete', 633667),
 ('Altitud', 4559745),
 ('Camboya', 6912186),
 ('1942', 73859),
 ('Vandalismo', 7945035),
 ('CEST', 3015353),
 ('Wikidata', 7769862),
 ('Bienvenidos', 8577382),
 ('1990', 3181298)]

In [12]:
rdd_pagelinks_map.count()

1028647

In [13]:
rdd_contents_map.count()

413216

Hago el join de ambos RDD

In [14]:
rdd_join = rdd_pagelinks_map.join(rdd_contents_map)

Ahora hago un map para que me quede con la forma de un grafo como se menciono en una repuesta a consultas en el Slack, Vertice de salida y vertice de llegada. Hago un take para verificar que me haya quedado correctamente. 

In [15]:
rdd_join_map = rdd_join.map(lambda x: (x[1][0],x[1][1]))

In [16]:
rdd_join_map.take(1)

[(1362458, 1274948)]

Creo la nueva funcion que busca a los vecinos que están a distancia 1 del id elegido como indicaron en el Slack:

In [17]:
def busca_vecinos(rdd, id_from):
    rdd_devuelto = rdd.filter(lambda x: x[0] == id_from)
    return rdd_devuelto.collect()

Realizo algunos ejemplos para verificar el correcto funcionamiento y que el output sea igual al que sugirieron en SLACK

In [23]:
busca_vecinos(rdd_join_map,1362458)

[(1362458, 1274948)]

In [33]:
busca_vecinos(rdd_join_map,2587499)

[(2587499, 268308),
 (2587499, 369661),
 (2587499, 2211582),
 (2587499, 2248571),
 (2587499, 2130270),
 (2587499, 787207),
 (2587499, 1060256),
 (2587499, 1518682),
 (2587499, 189430),
 (2587499, 1028906),
 (2587499, 1228553),
 (2587499, 4595460),
 (2587499, 95969),
 (2587499, 16381),
 (2587499, 1386724),
 (2587499, 3631175),
 (2587499, 1064067),
 (2587499, 3068581),
 (2587499, 1529466),
 (2587499, 183847),
 (2587499, 420360),
 (2587499, 753181),
 (2587499, 1274548),
 (2587499, 5755276),
 (2587499, 36152),
 (2587499, 78624),
 (2587499, 469880),
 (2587499, 1721512),
 (2587499, 1931334),
 (2587499, 692724),
 (2587499, 751442),
 (2587499, 2724695),
 (2587499, 2536612),
 (2587499, 3280920),
 (2587499, 2311900),
 (2587499, 901970),
 (2587499, 2886537),
 (2587499, 645197),
 (2587499, 2852645),
 (2587499, 2064624),
 (2587499, 364335),
 (2587499, 1193716)]