In [1]:
import pandas as pd

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [3]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName("Data extractor") \
        .config("spark.executor.memory", "8g") \
        .config("spark.driver.memory", "6g") \
        .getOrCreate()

In [4]:
DATA_PATH = './ref_authors_process.parquet/'

In [5]:
spark_df = spark.read.parquet(DATA_PATH)

In [6]:
spark_df.printSchema()

root
 |-- old_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- has_doi: boolean (nullable = true)
 |-- issn: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- article_len: integer (nullable = true)
 |-- n_citation: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- year: double (nullable = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- gid: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- org: string (nullable = true)
 |    |    |-- orgid: string (nullable = true)
 |    |    |-- sid: string (nullable = true)
 |    |    |-- old_id: string (nullable = true)
 |    |    |-- orgs_count: integer (nullable = true)
 |-- venue: struct (nullable = true)
 |    |-- name_d: stri

In [7]:
spark_df.count()

2216306

In [8]:
id_authors = spark_df.withColumn('authors_clean_', f.expr(f'filter(authors, x -> x is not null)')) \
                    .withColumn('authors_clean', f.expr(f'filter(authors, x -> x.name is not null)'))

In [9]:
id_authors.select(f.explode('authors').alias('author')).count(), \
id_authors.select(f.explode('authors_clean').alias('author')).count()

(6647988, 6643461)

In [10]:
id_authors = id_authors.drop('authors', 'authors_clean_')

In [11]:
id_authors = id_authors.withColumnRenamed('authors_clean', 'authors')

In [12]:
id_authors.printSchema()

root
 |-- old_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- has_doi: boolean (nullable = true)
 |-- issn: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- article_len: integer (nullable = true)
 |-- n_citation: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- year: double (nullable = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- venue: struct (nullable = true)
 |    |-- name_d: string (nullable = true)
 |    |-- raw: string (nullable = true)
 |    |-- sid: string (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- old_id: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: st

In [28]:
def get_clean_pairs(df, col_name):
    clean_colname = col_name + '_cleaned'
    clean_df = df.withColumn(clean_colname, f.expr(f'filter({col_name}, x -> x is not null)'))
    clean_df = clean_df.na.drop(subset=[clean_colname]).where(f.col(clean_colname) != f.array())
    
    id_col = clean_df.select(f.col('old_id'), f.explode(f.col(clean_colname)))
    
    id_col = id_col.join(clean_df.select(f.col('old_id').alias('col')), ['col'], 'leftsemi')

    id_col = id_col.groupby('old_id').agg(f.collect_list('col').alias(col_name))
    
    return id_col

In [29]:
id_authors_ref = get_clean_pairs(id_authors, 'references')

In [30]:
id_authors_ref.select(f.explode('references')).count()

17572755

In [31]:
id_authors_ref.select(f.explode('references')).to_pandas_on_spark().describe()

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,col
count,17572755
unique,1399561
top,53e9986eb7602d97020ab93b
freq,12293


In [32]:
id_authors_ref.count()

1888307

In [33]:
id_authors_ref.printSchema()

root
 |-- old_id: string (nullable = true)
 |-- references: array (nullable = false)
 |    |-- element: string (containsNull = false)



Эти данные терять уже не хочется, но текущий размер значительно больше ожидаемого, поэтому отсемплируем от него 60%.

In [34]:
sampled = id_authors_ref.sample(withReplacement=False, fraction=0.6, seed=42)

In [35]:
sampled.count()

1134093

Отлично. **Но!** Теперь надо снова убрать лишние референсы, которые остались в семпле.

In [36]:
sampled.printSchema()

root
 |-- old_id: string (nullable = true)
 |-- references: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [37]:
id_refs = get_clean_pairs(sampled, 'references')

In [38]:
id_refs.printSchema()

root
 |-- old_id: string (nullable = true)
 |-- references: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [39]:
id_refs.count()

1048031

In [41]:
id_refs.select(f.explode('references')).count()

6197933

In [46]:
id_refs.select(f.explode('references')).to_pandas_on_spark().describe()

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,col
count,6197933
unique,716605
top,53e9986eb7602d97020ab93b
freq,7400


In [40]:
def join_tables(df, df_other, col_name):
    df_ = df.drop(col_name)
    
    df_ = df_.alias('df')
    df_other_ = df_other.alias('df_other')
    
    df_ = df_.join(df_other_, df_.old_id == df_other_.old_id).select('df.*', f'df_other.{col_name}')
    
    return df_

In [42]:
joined = join_tables(id_authors, id_refs, 'references')

In [43]:
joined.printSchema()

root
 |-- old_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- has_doi: boolean (nullable = true)
 |-- issn: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- article_len: integer (nullable = true)
 |-- n_citation: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- year: double (nullable = true)
 |-- venue: struct (nullable = true)
 |    |-- name_d: string (nullable = true)
 |    |-- raw: string (nullable = true)
 |    |-- sid: string (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- old_id: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- gid: string (nullable = true)
 |    |    |-- name: 

In [44]:
joined.count()

1048031

In [45]:
joined.select(f.explode('references')).count()

6197933

Сохраним этот датасет.

In [47]:
joined.write.parquet('fin_dataset.parquet')

In [48]:
fin_ds = spark.read.parquet('fin_dataset.parquet')

In [49]:
fin_ds.printSchema()

root
 |-- old_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- has_doi: boolean (nullable = true)
 |-- issn: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- article_len: integer (nullable = true)
 |-- n_citation: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- year: double (nullable = true)
 |-- venue: struct (nullable = true)
 |    |-- name_d: string (nullable = true)
 |    |-- raw: string (nullable = true)
 |    |-- sid: string (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- old_id: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- gid: string (nullable = true)
 |    |    |-- name: 

In [50]:
fin_ds.count()

1048031