In [1]:
import pyspark
import os
import math
import random
import sys
import pandas as pd
import numpy as np
from IPython.display import display
import re

# make sure pyspark tells workers to use python2 not 3 if both are installed\\n\",\n",
os.environ["PYSPARK_PYTHON"] = "python2"
os.environ['PYSPARK_SUBMIT_ARGS'] = ('--packages '
    'graphframes:graphframes:0.3.0-spark2.0-s_2.11'
    ' pyspark-shell')
os.environ['PYTHONPATH'] = ':'.join(sys.path)

import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)

In [64]:
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg.distributed import MatrixEntry
from pyspark.mllib.linalg.distributed import CoordinateMatrix
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.feature import HashingTF
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import col
from difflib import SequenceMatcher
from graphframes import *
import toyplot

In [3]:
!pip install toyplot



# Obtaining the data

The full AGRIS repository is available from the [AGRIS AllegroGraph WebView](http://202.45.139.84:10035/catalogs/fao/repositories/agris) which allows us to query the repository from a sparql endpoint. However, instead of using it we have downloaded the full repository in RDF format and uploaded it to our own [Virtuoso Sparql Endpoint](http://virtuoso.udl.cat:8890/sparql). By this way, we have been able to query the repository without problems of timeout or other problems we were having using the AGRIS sparql endpoint.

Once we had our sparql endpoint working, we could start obtaining the data. It neither was an easy task because the AGRIS repository contains a huge volume of data that we could not obtain with a single query. To solve this problem we limited the queries to return one million rows per query, so we had to execute the query multiples times to obtain the full dataset.

Furthermore, another trouble to obtain the data was that some fields were lists of elements and it is a little bit difficult to obtain those fields in list format. In order to solve this problem we have divided our query into some different queries. In the first query we have obtained all the fields which contains a single value for each article.

"AGRIS QUERY GENERAL"

The other queries are one query for each field in list format we want to obtain.

"OTHER QUERIES"

In [4]:
articles_spark_DF = sqlContext.read.format("csv").load("data/Article-*.csv",header = True)
articles_lang_spark_DF = sqlContext.read.format("csv").load("data/Lang-*.csv",header = True)
authors_spark_DF = sqlContext.read.format("csv").load("data/Authors-*.csv",header = True)
subjects_spark_DF = sqlContext.read.format("csv").load("data/Sub-*.csv",header = True)
print articles_spark_DF.count()
print articles_lang_spark_DF.count()
print authors_spark_DF.count()
print subjects_spark_DF.count()

5589673
5553754
5461861
4056963


As we have said before, the AGRIS repository has a huge volume of data and we will not be able to process the whole of it. So what we will do is load the whole dataset into pandas only to obtain a sample of 10000 rows. This will be the dataset we will use to do the cleaning process.

In [5]:
df1 = pd.read_csv("data/Article-0M-1M.csv", sep=",",header=0,low_memory=False)
df2 = pd.read_csv("data/Article-1M-2M.csv", sep=",",header=0,low_memory=False)
df3 = pd.read_csv("data/Article-2M-3M.csv", sep=",",header=0,low_memory=False)
df4 = pd.read_csv("data/Article-3M-4M.csv", sep=",",header=0,low_memory=False)
df5 = pd.read_csv("data/Article-4M-5M.csv", sep=",",header=0,low_memory=False)
df6 = pd.read_csv("data/Article-5M-6M.csv", sep=",",header=0,low_memory=False)

df_ = pd.concat([df1,df2,df3,df4,df5,df6],ignore_index = True)

articles = df_.sample(n=10000)
articles.to_csv('data/current_sample.csv')
articles.sort_index(inplace=True)

df1 = pd.read_csv("data/Lang-0M-1M.csv", sep=",",header=0,low_memory=False)
df2 = pd.read_csv("data/Lang-1M-2M.csv", sep=",",header=0,low_memory=False)
df3 = pd.read_csv("data/Lang-2M-3M.csv", sep=",",header=0,low_memory=False)
df4 = pd.read_csv("data/Lang-3M-4M.csv", sep=",",header=0,low_memory=False)
df5 = pd.read_csv("data/Lang-4M-5M.csv", sep=",",header=0,low_memory=False)
df6 = pd.read_csv("data/Lang-5M-6M.csv", sep=",",header=0,low_memory=False)

df_ = pd.concat([df1,df2,df3,df4,df5,df6],ignore_index = True)

In [6]:
df1 = pd.read_csv("data/Authors-0M-1M.csv", sep=",",header=0,low_memory=False)
df2 = pd.read_csv("data/Authors-1M-2M.csv", sep=",",header=0,low_memory=False)
df3 = pd.read_csv("data/Authors-2M-3M.csv", sep=",",header=0,low_memory=False)
df4 = pd.read_csv("data/Authors-3M-4M.csv", sep=",",header=0,low_memory=False)
df5 = pd.read_csv("data/Authors-4M-5M.csv", sep=",",header=0,low_memory=False)
df6 = pd.read_csv("data/Authors-5M-6M.csv", sep=",",header=0,low_memory=False)

df_Authors_ = pd.concat([df1,df2,df3,df4,df5,df6],ignore_index = True)
df_Authors_.to_csv('data/authors.csv')

df1 = pd.read_csv("data/Sub-0M-1M.csv", sep=",",header=0,low_memory=False)
df2 = pd.read_csv("data/Sub-1M-2M.csv", sep=",",header=0,low_memory=False)
df3 = pd.read_csv("data/Sub-2M-3M.csv", sep=",",header=0,low_memory=False)
df4 = pd.read_csv("data/Sub-3M-4M.csv", sep=",",header=0,low_memory=False)
df5 = pd.read_csv("data/Sub-4M-5M.csv", sep=",",header=0,low_memory=False)

df_Subjects = pd.concat([df1,df2,df3,df4,df5],ignore_index = True)

# Cleaning process

## Duplicated articles

In [7]:
articles_spark_DF.createOrReplaceTempView("data")
# Check if there are duplicated articlesIds
print sqlContext.sql("SELECT COUNT(articleId) FROM data").show()
print sqlContext.sql("SELECT COUNT(DISTINCT(articleId)) FROM data").show()
print sqlContext.sql("SELECT articleId, Count(articleId) FROM data GROUP BY articleId HAVING COUNT(*) > 1").show()

+----------------+
|count(articleId)|
+----------------+
|         5589673|
+----------------+

None
+-------------------------+
|count(DISTINCT articleId)|
+-------------------------+
|                  5553754|
+-------------------------+

None
+------------+----------------+
|   articleId|count(articleId)|
+------------+----------------+
|CO2015000039|               4|
|DJ2012041730|               2|
|DJ2012041780|               2|
|DJ2012044257|               2|
|DJ2012044287|               2|
|DJ2012045098|               2|
|DJ2012046804|               2|
|DJ2012046887|               2|
|DJ2012046920|               2|
|DJ2012046952|               2|
|DJ2012049552|               2|
|DJ2012049843|               2|
|DJ2012054487|               2|
|DJ2012057300|               2|
|DJ2012063234|               2|
|DJ2012064631|               2|
|DJ2012067455|              12|
|DJ2012072330|               2|
|DJ2012072398|               2|
|DJ2012074987|               2|
+------------+---

In [8]:
# Join the same rows
articles_spark_DF = articles_spark_DF.dropDuplicates()
articles_spark_DF.createOrReplaceTempView("data")
print sqlContext.sql("SELECT COUNT(articleId) FROM data").show()
print sqlContext.sql("SELECT COUNT(DISTINCT(articleId)) FROM data").show()
print sqlContext.sql("SELECT articleId, Count(articleId) FROM data GROUP BY articleId HAVING COUNT(*) > 1").show()

+----------------+
|count(articleId)|
+----------------+
|         5574251|
+----------------+

None
+-------------------------+
|count(DISTINCT articleId)|
+-------------------------+
|                  5553754|
+-------------------------+

None
+--------------+----------------+
|     articleId|count(articleId)|
+--------------+----------------+
|  FR2001000690|               8|
|  RU2015H08472|               2|
|  KR2015000782|               4|
|  DJ2012084336|               2|
|  FR2001000569|               8|
|  FI2007188479|               4|
|  DJ2012075801|               2|
|  DJ2012076174|               2|
|     US8206741|               8|
|  DJ2012064631|               2|
|  BG2015100016|               4|
|     US8210952|               4|
|US201300869572|               4|
|  DJ2012046804|               2|
|  FI2007029186|               4|
|  DJ2012054487|               2|
|  DJ2012078767|               2|
|  DJ2012049552|               2|
|  DJ2012044257|               2|
|  DJ

In [9]:
def equalArticles(x,y):
    if not isinstance(x.issued,list):
        x = Row(articleId=x.articleId,issued=[x.issued],submitted=[x.submitted],conferenceTitle=[x.conferenceTitle],journalTitle=[x.journalTitle],journalIssn=[x.journalIssn])
    if not isinstance(y.issued,list):
        y = Row(articleId=y.articleId,issued=[y.issued],submitted=[y.submitted],conferenceTitle=[y.conferenceTitle],journalTitle=[y.journalTitle],journalIssn=[y.journalIssn])
    return Row(articleId=x.articleId, issued=list(set(x.issued) | set(y.issued)), submitted=list(set(x.submitted) | set(y.submitted)), conferenceTitle=list(set(x.conferenceTitle) | set(y.conferenceTitle)), journalTitle=list(set(x.journalTitle) | set(y.journalTitle)), journalIssn=list(set(x.journalIssn) | set(y.journalIssn)))

def mapPhase(a):
    y = a[1]
    #if isinstance(y.issued,list):
        #return Row(articleId=y.articleId,issued='|'.join([unicode(x, "utf-8") for x in y.issued]),submitted='|'.join([unicode(str(x), "utf-8") for x in y.submitted]),conferenceTitle='|'.join([unicode(str(x), "utf-8") for x in y.conferenceTitle]),journalTitle='|'.join([unicode(str(x), "utf-8") for x in y.journalTitle]),journalIssn='|'.join([unicode(str(x), "utf-8") for x in y.journalIssn]))

    return y

schema = StructType([StructField("articleId", StringType(), True),StructField("issued", StringType(), True),StructField("submitted", StringType(), True),StructField("conferenceTitle", StringType(), True),StructField("journalTitle", StringType(), True),StructField("journalIssn", StringType(), True)])

In [10]:
articles_spark_RDD = articles_spark_DF.rdd.map(lambda x: (x.articleId,x)).reduceByKey(equalArticles).map(mapPhase)
articles_spark_DF = sqlContext.createDataFrame(articles_spark_RDD,schema)
articles_spark_DF.createOrReplaceTempView("data")

In [11]:
print sqlContext.sql("SELECT COUNT(articleId) FROM data").show()
print sqlContext.sql("SELECT COUNT(DISTINCT(articleId)) FROM data").show()
print sqlContext.sql("SELECT articleId, Count(articleId) FROM data GROUP BY articleId HAVING COUNT(*) > 1").show()

+----------------+
|count(articleId)|
+----------------+
|         5553754|
+----------------+

None
+-------------------------+
|count(DISTINCT articleId)|
+-------------------------+
|                  5553754|
+-------------------------+

None
+---------+----------------+
|articleId|count(articleId)|
+---------+----------------+
+---------+----------------+

None


## Join articles data and language data

In [12]:
articles_spark_DF = \
articles_spark_DF.join(articles_lang_spark_DF, articles_spark_DF.articleId == articles_lang_spark_DF.articleId)\
.drop(articles_lang_spark_DF.articleId)

articles_spark_DF.createOrReplaceTempView("data")
print sqlContext.sql("SELECT * FROM data WHERE articleId = 'FR2001000690'").show()
#.rdd.filter(lambda x: isinstance(x.issued,list)).toDF()\

+------------+---------------+------------+---------------+--------------------+--------------------+--------+
|   articleId|         issued|   submitted|conferenceTitle|        journalTitle|         journalIssn|language|
+------------+---------------+------------+---------------+--------------------+--------------------+--------+
|FR2001000690|[mar2001, 1999]|[2002, 2001]|         [null]|[Game and Wildlif...|[1622-7662, 0750-...| eng|fre|
+------------+---------------+------------+---------------+--------------------+--------------------+--------+

None


## Get sampled data for the cleaning process

In [13]:
articles = articles_spark_DF.where("issued NOT LIKE '[%' AND language NOT LIKE '%|%'").sample(False,0.002,42).toPandas()

In [14]:
articles.rename(columns={'conferenceTitle':'conferenceData', 'journalTitle':'journalData'}, inplace=True)

## Extract conference information:

We want to classify all the conferences, setting the same ID in the same conferences.

Also from conferences, we are interested in the year, in order to compare it with the other years in the data.

In [15]:
# Tenen conferencia
len(articles[articles.conferenceData.notnull()])

290

In [16]:
conferenceDF =  pd.DataFrame(articles.conferenceData)

### Get the year from the data
First, we validate if all the dates have a year

In [17]:
# Que els quatre digits seguits siguin any
# Primer miro si hi han mes de un grup de 4 digits seguits ens les dades
pattern = r'^.*\d{4}.*\d{4}.*$'
np.sum(conferenceDF.conferenceData[conferenceDF.conferenceData.notnull()].str.contains(pattern,na=False))

8

In [18]:
# Si n'hi ha, mirem que siguin el mateix any. Mostrem els que son diferents valors. Aquetst els despreciarem
pattern = r'^.*(\d{4}).*(\d{4}).*$'
moreThanOne = conferenceDF.conferenceData[conferenceDF.conferenceData.notnull()].str.extract(pattern,expand=False)
moreThanOneDifferents = moreThanOne[moreThanOne[0].notnull() & (moreThanOne[0] != moreThanOne[1])]

In [19]:
moreThanOneDifferents.index

Int64Index([1944, 4942, 7014], dtype='int64')

In [20]:
# Si el resultat és 0 també s'ha de comprovar que totes tinguin 4 digits seguits
pattern = r'^.*\d{4}.*$'
len(conferenceDF.conferenceData[conferenceDF.conferenceData.notnull()]) - np.sum(conferenceDF.conferenceData[conferenceDF.conferenceData.notnull()].str.contains(pattern,na=False))

5

In [21]:
# Busquem les que no tenen els anys amb format correcte
badYearFormat = conferenceDF.conferenceData[conferenceDF.conferenceData.notnull()].str.contains(pattern,na=False)
badYearFormat = badYearFormat[badYearFormat == False]
conferenceDF.conferenceData[conferenceDF.conferenceData.index.isin(badYearFormat.index)]

907     2. Congresso della Frisona. Vietri (Italy). [nd].
2500    "25. Arbeitstagung der DVG. Arbeitsgebiet ""Le...
5852    "1. Statusseminar der Projektgruppe Bayern zur...
6851    Moderne tecnologie nell' allevamento dei suini...
9678    "Vortragsveranstaltung ""Fleischforschung aktu...
Name: conferenceData, dtype: object

In [22]:
# Obtenim l'any i eliminem els que hi han a moreThanOneDifferents
pattern = r'^.*(\d{4}).*$'
conferenceDF['conferenceDateYear'] = conferenceDF.conferenceData.str.extract(pattern, expand=False)
conferenceDF.conferenceDateYear[conferenceDF.conferenceDateYear.index.isin(moreThanOneDifferents.index)] = np.nan
conferenceDF.conferenceDateYear = pd.to_numeric(conferenceDF.conferenceDateYear, errors='coerce')
len(conferenceDF.conferenceDateYear[conferenceDF.conferenceDateYear.notnull()])

282

In [23]:
articles["conferenceYear"] = conferenceDF.conferenceDateYear

## Extract year and month from issued date

The field 'issued' shows when the article was published and it comes with some different formats incliding the year and sometimes the month. 

In [24]:
issuedYear = articles.issued.str.extract("(\d{4}|[0,3-9]\d)", expand=False)
issuedYear = pd.to_numeric(issuedYear, errors='coerce')

In [25]:
def completeYears(year):
    century = 1900 if year > 40 else 2000
    return century + year
issuedYear[issuedYear < 99] = issuedYear[issuedYear < 99].apply(completeYears)

In [26]:
articles['issuedYear'] = issuedYear

In some cases, the month comes in a numeric way and other times it comes within its abbreviation. We will unify it converting the abbreviations to numerical months.

In [27]:
issuedMonthText = articles.issued.str.extract("(ene|jan|feb|mar|apr|abr|mai|may|jun|jul|aug|ago|sept|sep|oct|nov|dic|dec)", expand=False)
issuedMonthNumeric = articles.issued.str.extract("\D(\d|1[1-2])\D?$", expand=False)
issuedMonth = issuedMonthText.fillna(issuedMonthNumeric)

In [28]:
issuedMonth = issuedMonth.replace('ene|jan', '1', regex=True)
issuedMonth = issuedMonth.replace('feb', '2')
issuedMonth = issuedMonth.replace('mar', '3')
issuedMonth = issuedMonth.replace('apr|abr', '4', regex=True)
issuedMonth = issuedMonth.replace('mai|may', '5', regex=True)
issuedMonth = issuedMonth.replace('jun', '6')
issuedMonth = issuedMonth.replace('jul', '7')
issuedMonth = issuedMonth.replace('ago|aug', '8', regex=True)
issuedMonth = issuedMonth.replace('sept|sep', '9', regex=True)
issuedMonth = issuedMonth.replace('oct', '10')
issuedMonth = issuedMonth.replace('nov', '11')
issuedMonth = issuedMonth.replace('dic|dec', '12', regex=True)
issuedMonth = issuedMonth.fillna('6')

In [29]:
articles['issuedMonth'] = issuedMonth

Finally, we will check that we have specified an issuedYear and issuedMonth for each row of the sample dataset.

In [30]:
articles.issuedYear[articles.issuedYear.isnull() == True].count()

0

In [31]:
articles.issuedMonth[articles.issuedMonth.isnull() == True].count()

0

## Check correctness of issued, submitted year and conference year

Once we have obtained the issued year, we can check that the article had been issued before it had been submitted to the AGRIS repository ('submitted' field).

In [32]:
articles.ix[articles.submitted - articles.issuedYear < 0, 'issuedYear'] = articles[articles.submitted - articles.issuedYear < 0].submitted

TypeError: ufunc 'subtract' did not contain a loop with signature matching types dtype('<U32') dtype('<U32') dtype('<U32')

In [None]:
articles[articles.submitted - articles.issuedYear < 0].count()

In [None]:
# Check correctness of conference year
articles.conferenceYear[(articles.conferenceYear < 1970) | (articles.conferenceYear > 2020)]

In [None]:
articles.articleId[articles.conferenceYear.notnull() & (articles.conferenceYear > articles.submitted)].count()

## Removing misstyped journal names

Our dataset contains the journal where some articles have been published. Specifically, there is a field specifying the name of the journal and another field specifying the ISSN of the journal. The problem is that sometimes the journal name does not match for a particular ISSN, which is obviously an error. To solve this problem we associate the most frequent journal name for each ISSN to all the articles of this ISSN.

In [33]:
articles.journalData = articles.journalIssn.map(articles.groupby('journalIssn').journalData.agg(lambda x:x.value_counts().index[0]).to_dict())

In [34]:
articles.journalIssn = articles.journalData.map(articles.groupby('journalData').journalIssn.agg(lambda x:x.value_counts().index[0]).to_dict())

In [35]:
articles.groupby('journalIssn').journalData.value_counts()

journalIssn                                                         journalData                                                                                                                                                                                                           
 F.R.). 18-21 Sep 1984."                                             der Schweizerischen Tieraerztlichen Vereinigung fuer Fleischhygiene und der Deutschen Gesellschaft fuer Hygiene und Mikrobiologie. Kommission Lebensmittelmikrobiologie und -hygiene. Garmisch-Partenkirchen (Germany     1
 otdelenie biologicheskikh nauk"                                    "Akhboroti Akademiiai fankhoi RSS Tochikiston, shu""bai fankhoi biologi = Izvestiia Akademii nauk Tadzhikskoi SSR                                                                                                          1
0000-8954                                                           Series entomologica                                                    

In [36]:
articles.groupby('journalData').journalIssn.value_counts()

journalData                                                                                                                                                                                                             journalIssn                                                       
 F.R.). 16-17 Sep 1985."                                                                                                                                                                                                Mitteilungsblatt der Bundesanstalt fuer Fleischforschung, Kulmbach    1
 F.R.). 27 Feb - 1 Mar 1989."                                                                                                                                                                                           GSF-Bericht (Germany                                                  1
 der Schweizerischen Tieraerztlichen Vereinigung fuer Fleischhygiene und der Deutschen Gesellschaft fuer Hygiene und Mikrobiologie. Kommissio

## Languages correctness

We have detected that sometimes language codes are capitalized, so we will transform all codes to lower case. Additionally, we want to be sure that the codes are correct so we have found a dataset with all the language codes and the language which is representing the code in English. Therefor, we will verify the language codes with the language codes dataset and we will fill another column with the language in English for a more visual understanding.

In [37]:
articles.language = articles.language.str.lower()

In [38]:
articles.groupby('language').language.value_counts()

language  language
ara       ara            2
arm       arm            1
bel       bel            1
bem       bem            1
bul       bul           66
chi       chi          101
cze       cze           59
dan       dan           59
dut       dut           64
eng       eng         7121
esp       esp          368
est       est            1
fin       fin            1
fra       fra           26
fre       fre          455
ger       ger          671
glg       glg            1
gre       gre            1
heb       heb            7
hun       hun           37
ind       ind           35
ita       ita          223
jpn       jpn          198
kor       kor           10
lat       lat            1
lav       lav            7
mac       mac            1
may       may            1
nor       nor            9
per       per            2
pol       pol           73
por       por          160
rum       rum           33
rus       rus          498
scc       scc            7
scr       scr            8
slo      

In [39]:
langs = pd.read_csv("data/language-codes-3b2.csv", sep=",",header=0,low_memory=False)

IOError: File data/language-codes-3b2.csv does not exist

In [40]:
def extractLanguageName(lang):
    langName = langs.ix[langs['alpha3-b'] == lang, 'English']
    if(langName.count() > 0):
        return langName.iloc[0]
    else:
        return np.nan

In [41]:
articles['languageName'] = articles.language.map(extractLanguageName)

NameError: global name 'langs' is not defined

In [42]:
langs.ix[langs['alpha3-b'] == 'fre', 'English'].iloc[0]

NameError: name 'langs' is not defined

In [43]:
articles[articles.languageName.isnull() == True].language.value_counts()

AttributeError: 'DataFrame' object has no attribute 'languageName'

In [None]:
articles.ix[articles.language == 'esp','languageName'] = 'Spanish; Castilian'
articles.ix[articles.language == 'esp','language'] = 'spa'
articles.ix[articles.language == 'fra','languageName'] = 'French'
articles.ix[articles.language == 'fra','language'] = 'fre'

### Set conference Ids

In [44]:
conferenceDF["conferenceId"] = np.nan

In [45]:
# https://docs.python.org/2.4/lib/sequence-matcher.html

# Return a measure of the sequences' similarity as a float in the range [0, 1].
# Where T is the total number of elements in both sequences, and M is the number of matches, this is 2.0*M / T. 
# Note that this is 1.0 if the sequences are identical, and 0.0 if they have nothing in common.
# This is expensive to compute if get_matching_blocks() or get_opcodes() hasn't already been called, 
# in which case you may want to try quick_ratio() or real_quick_ratio() first to get an upper bound.

def similar(a, b):
    return SequenceMatcher(None, a, b).ratio()

In [46]:
# https://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance#Python
# https://pypi.python.org/pypi/editdistance

# Levenshtein distance algorithm. This tells us the number of edits needed to turn one string into another. 

def levenshtein(source, target):
    if len(source) < len(target):
        return levenshtein(target, source)

    # So now we have len(source) >= len(target).
    if len(target) == 0:
        return len(source)

    # We call tuple() to force strings to be used as sequences
    # ('c', 'a', 't', 's') - numpy uses them as values by default.
    source = np.array(tuple(source))
    target = np.array(tuple(target))

    # We use a dynamic programming algorithm, but with the
    # added optimization that we only need the last two rows
    # of the matrix.
    previous_row = np.arange(target.size + 1)
    for s in source:
        # Insertion (target grows longer than source):
        current_row = previous_row + 1

        # Substitution or matching:
        # Target and source items are aligned, and either
        # are different (cost of 1), or are the same (cost of 0).
        current_row[1:] = np.minimum(
                current_row[1:],
                np.add(previous_row[:-1], target != s))

        # Deletion (target grows shorter than source):
        current_row[1:] = np.minimum(
                current_row[1:],
                current_row[0:-1] + 1)

        previous_row = current_row

    return previous_row[-1]

In [47]:
# DIMSUM

# http://spark.apache.org/docs/2.0.2/api/python/pyspark.mllib.html#module-pyspark.mllib.linalg.distributed
# https://es.wikipedia.org/wiki/Similitud_coseno
# https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html
# https://forums.databricks.com/questions/248/when-should-i-use-rowmatrixcolumnsimilarities.html
# https://github.com/mrsqueeze/spark-hash

def lexicoOrder(s):
    s.sort(key=lambda item: (len(item), item))
    return s

htf = HashingTF()

def parseHash(e):
    return htf.indexOf(e)

def parseStringToHashItemsList(s):
    res = []
    for e in lexicoOrder(s.split(" ")):
        res.append(parseHash(e))
    return res
    
rows = sc.parallelize(strings).map(parseStringToHashItemsList)

    mat = RowMatrix(rows)

def parseVec(v,idx):
    vec = []
    for idy, el in enumerate(v.toArray()):
        vec.append(MatrixEntry(idy,idx,el))
    return vec
  

def transpose(m):
    rows = m.rows
    rdd = rows.zipWithIndex().flatMap(lambda (vec, idx): [x for x in parseVec(vec,idx)])
    return rdd

coordMat = CoordinateMatrix(transpose(mat))
rowMatrix = coordMat.toRowMatrix()
mat = rowMatrix

sims = mat.columnSimilarities() # sims es del tipus CoordinateMatrix
# sims.entries és un RDD de MatrixEntry del format (long,long,float)

transformedRDD = sims.entries.map(lambda e: np.array([e.i,e.j,e.value]))
print transformedRDD.collect()

IndentationError: unexpected indent (<ipython-input-47-0dedf5036d3d>, line 26)

In [48]:
# https://en.wikipedia.org/wiki/MinHash
# Implemented in the last scala or java API 
# http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.MinHashLSH
# http://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/ml/feature/MinHashLSH.html

# LSH is a form of bucketing/binning which groups similar rows into k number of buckets. 
# The contents of these buckets are then compared in parallel to reduce overall wall-clock time.


In [49]:
strings = []
strings.append("Hola buenos dias como estamos todos") # 0
strings.append("Hola buenos dias como estamos todos y todas") # 1
strings.append("Hola buenos dias como estamos todas") # 2
strings.append("Hola buenos dias como estamos todos") # 3
strings.append("Hola buenos dias TEST como estamos todos") # 4
strings.append("Sin relación alguna") # 5
strings.append("dias buenos estamos todos como Hola") # 6
strings.append("Sin relación algunas") # 7
strings.append("Sin relación algunadd") # 8

In [50]:
# https://en.wikipedia.org/wiki/Jaccard_index

# The Jaccard Similarity between two sets A, B is
# |Intersection(A, B)| / |Union(A, B)|

def jaccardSimilarity (totalCommon, totalA, totalB):
    union = totalA + totalB - totalCommon
    return totalCommon * 1.0 / union

def intersect(a,b):
    res = []
    for x in a:
        if x in b:
            res.append(x)
    return res

def parseData(a,b):
    if jaccardSimilarity(len(intersect(a[0],b[0])),len(a[0]),len(b[0])) > 0.5:
        return [a[1],[b[1]]]
    return [-1]

In [51]:
rdd = sc.parallelize(strings).map(lambda a: [x for x in a.split(" ")]).zipWithIndex()
cart = rdd.cartesian(rdd)

In [52]:
res = cart.filter(lambda (a,b): a[1] < b[1])\
.map(lambda (a,b):parseData(a,b))\
.filter(lambda x: x[0] > 0)\
.reduceByKey(lambda x,y: x + y)
res.collect()

[(1, [2, 3, 4, 6]), (2, [3, 4, 6]), (3, [4, 6]), (4, [6])]

In [53]:
def similarConference(a,b):
    if a["conferenceDateYear"] != b["conferenceDateYear"]:
        return False
    return similar(a["conferenceData"],b["conferenceData"]) > 0.6

def conferenceComput(cdf):
    ID = 1
    diff = 0
    addNew = True
    for idx, x in cdf[cdf.conferenceData.notnull()].iterrows():
        if ID == 1:
            cdf.loc[idx,"conferenceId"] = ID
            ID += 1
            continue
        for idy, y in cdf[cdf.conferenceId.notnull()].iterrows():
            if similarConference(x,y):
                #cdf.loc[idx,"conferenceId"] = cdf.loc[idy,"conferenceId"]
                cdf.set_value(idx,"conferenceId",cdf.loc[idy,"conferenceId"])
                addNew = False
                diff += 1
                break
        if addNew:
            cdf.loc[idx,"conferenceId"] = ID
            ID += 1
        addNew = True
    return diff
conferenceComput(conferenceDF)

15

In [54]:
articles["ConferenceId"] = conferenceDF.conferenceId

# Coauthors networks

In [55]:
authorsLines = sc.textFile("data/authors.csv")
header = authorsLines.first()
rows = authorsLines.filter(lambda line: line != header)
authors = rows.flatMap(lambda line: re.findall('"([^"]*)"', line)).map(lambda a: a.split('|')) 

In [57]:
authorsLines.count()

5576414

## Coauthorship networks

As the final objective of this project is to disambiguate author names which corresponds to different persons, we will do it by obtaining the coauthorship networks of a given author and toughen the different networks with other parameters. 

The first step of this process is to extract the coauthorship networks of a given author. To do it, we filter the rows searching for the given author and we obtain the network representing the relations of this author.

In [58]:
import itertools   
keyToSearch = 'Zidorn, C.'#'Yamaoka, Y.'#'Zidorn, C.'#'Ellmerer, E.P.'#'Congress.'
filteredAuthors = authors.map(set).filter(lambda a: keyToSearch in a).map(lambda a: a.difference({keyToSearch}))
filteredAuthors.count()

20

In [59]:
coauthors = filteredAuthors.flatMap(lambda a: [[tuple(sorted(coauth)), 1] for coauth in itertools.combinations(a, 2)])
coauthorsMap = coauthors.reduceByKey(lambda x, y: x + y)
coauthorsList = coauthorsMap.map(lambda (key,val): (key[0],key[1],val))

In [60]:
def yieldFunc(list):
    for item in list:
        yield item
coauthorsunique = filteredAuthors.flatMap(yieldFunc).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)

In [None]:
def toCSVLine(data):
    return ';'.join(str(d) for d in data)

lines = coauthorsunique.map(toCSVLine)
lines.saveAsTextFile("data/coauthorsunique/" + keyToSearch.split(",")[0]);

lines = coauthorsList.map(toCSVLine)
lines.saveAsTextFile("data/coauthorslist/" + keyToSearch.split(",")[0]);

In [63]:
v = sqlContext.createDataFrame(coauthorsunique,['id','times'])
e = sqlContext.createDataFrame(coauthorsList,['src','dst','times'])
g = GraphFrame(v, e)

In [65]:
edges_array = np.array(
    g.edges
    .select("src", "dst").rdd
    .map(lambda r: [r.src, r.dst])
    .collect()
)

In [66]:
toyplot.graph(edges_array, width=1000);

## Most frequent coauthors

We have used the algorithm FP-Growth to obtain the most frequent coauthor networks oof two or more authors. Later on, we will use this results to toughen the relation between these author on the particular author's coauthorship networks graph. By this way, when we will see a strong line between two authors on a coauthorship network we will rely on this relation and it will be another factor to determine that the author is only one person for this coauthorship network.

In [None]:
coauthors = authors.map(lambda a: list(set(a)))

In [None]:
from pyspark.mllib.fpm import FPGrowth

model = FPGrowth.train(coauthors, minSupport=0.00001, numPartitions=4)
rddfreqitemsets = model.freqItemsets()

We have to take into account that this algorithm considers single strings as a subset of items, so we must expect that the most common subsets to be single item subsets.

In [None]:
rddfreqitemsets.take(50)

For this reason we have had to decrease to a very small value the 'minSupport' parameter of the algorithm. This value indicates the minimum frequency of a subset to be considered as a result. Otherwise, we had only obtained single item subsets in the results.

In [None]:
result = rddfreqitemsets.filter(lambda x  : len(x.items) >= 2).collect()

Unfortunately, once we could obtain a result with non single item subsets, we viewed that most of the authors obtained from this algorithm are Universities and other strange or generic names that does not correspond to an author name. So from the results we will only be able to extract a few useful subsets for the disambiguating process.

In [None]:
result