# Data validation between the data set in BigQuery and GraphDB

The objectives are to:
1. Ensure complete upload of data to BigQuery and GraphDB.
2. Check for data qualtiy issues.
3. Gain insights on the data.
4. Engineer features.

In [None]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [None]:
# Mount to Google Drive to save results
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/MSc/2020-21/Research\ Project/Colab/
%ls

In [None]:
# Connect to GCP Bucket
from google.colab import auth
auth.authenticate_user()

In [None]:
# Set GCP project ID and region to Europe West 2 - London
PROJECT = 'fake-news-bs-detector'
!gcloud config set project $PROJECT
REGION = 'europe-west2'
CLUSTER = '{}-cluster'.format(PROJECT)
!gcloud config set compute/region $REGION
!gcloud config set dataproc/region $REGION

!gcloud config list # show some information

## Check the number of files in successive GCP cloud storage buckets

In [None]:
# Count the number of cleaned JSON files from the end of stage 1 in the pipeline
!gsutil ls -l gs://fake_news_cleaned_json/*.json | wc -l

In [None]:
# Count the number of parsed JSON and TTL files into triples at the end of stage 2 in the pipeline
!gsutil ls -l gs://fake_news_ttl_json/*.ttl | wc -l
!gsutil ls -l gs://fake_news_ttl_json/*.json | wc -l

The variance between the 59,733 cleaned files to 27,590 turtle documents would suggest this is due to the raw data containing duplicating records for the same news web page, when the turtles are indexed by the hash value of the URLs and therefore would overwrite leading to small number of samples.

In [None]:
# Based on https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries#bigquery_simple_app_client-python
from google.cloud import bigquery
client = bigquery.Client(PROJECT)


## Profile the data in its original form held in BigQuery

In [None]:
# BIgQuery data row count
query_job = client.query(
    """
    SELECT COUNT(*) AS POPULATION_COUNT
    FROM `detect-fake-news-313201.fake_news_sql.src_fake_news`
    """
)

res_df = query_job.result().to_dataframe()  # Waits for job to complete.

res_df

Therefore deviation by one record compared to the number of files in `gs://fake_news_ttl_json`.

In [None]:
# BIgQuery data row count
query_job = client.query(
    """
    WITH URL_LIST AS (
      SELECT 
      URL
      , COUNT(*) AS URL_COUNT
      FROM `detect-fake-news-313201.fake_news_sql.src_fake_news`
      GROUP BY URL
    )
    SELECT * FROM URL_LIST WHERE URL_COUNT > 1
    """
)

res_df = query_job.result().to_dataframe()  # Waits for job to complete.

res_df

Therefore no samples found to have duplicating URL in the BigQuery table, and all articles have unique URLs.

In [None]:
# BIgQuery data preview
query_job = client.query(
    """
    SELECT *
    FROM `detect-fake-news-313201.fake_news_sql.src_fake_news`
    LIMIT 10
    """
)

res_df = query_job.result().to_dataframe()  # Waits for job to complete.

res_df

In [None]:
# BigQuery count by domain
query_job = client.query(
    """
    SELECT
    DOMAIN_HASH
    , LABEL
    , COUNT(*) AS ARTICLES_COUNT
    FROM `detect-fake-news-313201.fake_news_sql.src_fake_news`
    GROUP BY DOMAIN_HASH, LABEL
    ORDER BY ARTICLES_COUNT DESC
    """
)

res_df = query_job.result().to_dataframe()

# Tally the domain hash to ensure each domain only has one label
domain_tally_ls = []
duplicate_domain_tally_ls = []
for i, row in res_df.iterrows():
  if row['DOMAIN_HASH'] in domain_tally_ls:
    duplicate_domain_tally_ls += [True]
  else:
    duplicate_domain_tally_ls += [False]
  
  # Add domain hash to the list of domains already reviwed
  domain_tally_ls += [row['DOMAIN_HASH']]

res_df['DOMAIN_HAS_MULTIPLE_LABEL'] = duplicate_domain_tally_ls

res_df

In [None]:
# BIgQuery count by label
query_job = client.query(
    """
    SELECT
    LABEL
    , COUNT(*) AS LABEL_COUNT
    FROM `detect-fake-news-313201.fake_news_sql.src_fake_news`
    GROUP BY LABEL
    ORDER BY LABEL_COUNT DESC
    """
)

res_df = query_job.result().to_dataframe()

print('Total: {}'.format(res_df['LABEL_COUNT'].sum()))

res_df

In [None]:
# BIgQuery list of URLs
query_job = client.query(
    """
    SELECT DISTINCT
    URL_HASH
    FROM `detect-fake-news-313201.fake_news_sql.src_fake_news`
    """
)

res_df = query_job.result().to_dataframe()

print('Total: {}'.format(res_df.shape[0]))

res_df.head()

Noted that there were no classification for 990 samples, and further 354 with unknown classifications.



## Profile the data in GraphDB

In [None]:
# Install the wrapper package
# Source: https://github.com/RDFLib/sparqlwrapper
!pip install sparqlwrapper

In [None]:
# Code based on: https://sparqlwrapper.readthedocs.io/en/latest/main.html
from SPARQLWrapper import SPARQLWrapper, JSON

queryString = """
PREFIX aa: <http://www.city.ac.uk/ds/inm363/aaron_altrock#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

select (count(?url_hash) as ?url_count) where {
  ?url_hash rdf:type aa:urlHash .
}
"""


sparql = SPARQLWrapper("http://35.246.120.165:7200/repositories/src_fake_news")
sparql.setReturnFormat(JSON)
sparql.setQuery(queryString)

try :
   res_dct = sparql.query().convert()
   print('OK')

except Exception as e:
   print('ERROR: {}'.format(e))
   

In [None]:
# No. of URL hash in GraphDB
res_dct.get('results').get('bindings')[0].get('url_count').get('value')

Therefore noted that the number of news articles as URL hashes were completely uploaded when compared to BigQury count given both have the same number of articles `27598`.

In [None]:
res_dct

### No. of URL hashes

In [None]:
# Code based on: https://sparqlwrapper.readthedocs.io/en/latest/main.html

queryString = """
PREFIX aa: <http://www.city.ac.uk/ds/inm363/aaron_altrock#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

select ?domain_hash ?label (count(?url_hash) as ?url_count) where {
  ?domain_hash rdf:type aa:domainHash .
  ?url_hash rdf:type aa:urlHash .
  ?label rdf:type aa:newsLabel .
  ?url_hash aa:has_domain_hash ?domain_hash .
  ?url_hash aa:has_news_label ?label .
}
GROUP BY ?domain_hash ?label
ORDER BY ?url_count
"""


sparql = SPARQLWrapper("http://35.246.120.165:7200/repositories/src_fake_news")
sparql.setReturnFormat(JSON)
sparql.setQuery(queryString)

try :
   res_dct = sparql.query().convert()
   print('OK')

except Exception as e:
   print('ERROR: {}'.format(e))
   

In [None]:
res_dct

In [None]:
import re
res_ls = res_dct.get('results').get('bindings')

# Helper func to transform SPARQLWrapper query result output (dict) to Pandas dataframe
def parse_to_dataframe(res_ls):

  # If query result has content then parse to data frame else return None
  if len(res_ls) > 0:
    # Get column names
    col_nm_ls = list(res_ls[0].keys())

    parsed_res_ls = []

    for res_dct in res_ls:
      __res_ls = []
      for k, v in res_dct.items():
        __res_ls += [v.get('value')]

      # __res_ls = [res_dct]
      parsed_res_ls += [__res_ls]
    
    res_df = pd.DataFrame.from_dict(parsed_res_ls)
    res_df.columns = col_nm_ls
    res_df.reset_index(inplace=True, drop=True)

    return res_df

  else:
    return None

# Parse dict output from SPARQL to Pandas data frame
domain_url_count_df = parse_to_dataframe(res_ls)

# Remove name space prefix
domain_url_count_df['domain_hash'] = domain_url_count_df['domain_hash'].map(lambda str: str[str.find('#') + 1:])
domain_url_count_df['label'] = domain_url_count_df['label'].map(lambda str: str[str.find('#') + 1:])

# Convert url_count to integer
domain_url_count_df['url_count'] = domain_url_count_df['url_count'].map(int)

# Count percentage
domain_url_count_df['url_count_pct'] = domain_url_count_df['url_count'].map(lambda val: val / domain_url_count_df['url_count'].sum() * 100)

domain_url_count_df.sort_values(by=['url_count'], ascending=False, inplace=True)

domain_url_count_df

In [None]:
# Summary of the classification by count of articles
label_count_df = domain_url_count_df[['label', 'url_count']].groupby('label').sum().sort_values(by='url_count', ascending=False).reset_index(drop=False)
total_rec_count = sum(label_count_df['url_count'].to_list())
print('Total number of articles: {}'.format(total_rec_count))
label_count_df['url_count_pct'] = label_count_df['url_count'].map(lambda val: val / total_rec_count * 100 if val is not None else 0)
label_count_df

Per the https://github.com/several27/FakeNewsCorpus details on each news article classification noted no such classifications as `unknown` nor a classification of `None`.  These may be added by the author of the data set who performed scrapping to backfill classifications that could not be found.

In [None]:
# Save summary to CSV
label_count_df.to_csv(r'label_count_df.csv', index=False)

## No. of domains by classification

In [None]:
print('No. of domains in total: {}'.format(domain_url_count_df['domain_hash'].shape[0]))
domain_count_df = domain_url_count_df[['domain_hash', 'label']].groupby(by='label').count().sort_values(by='domain_hash', ascending=False).reset_index()
domain_count_df.rename(columns={'domain_hash': 'domain_count'}, inplace=True)
domain_count_df['domain_count_pct'] = domain_count_df['domain_count'].map(lambda val: val / domain_count_df['domain_count'].sum() * 100)
domain_count_df.to_csv('domain_count_df.csv', index=False)
domain_count_df

## Repeated publication of the same news articles by URL, domain and classifications

In [None]:
# Code based on: https://sparqlwrapper.readthedocs.io/en/latest/main.html

queryString = """
PREFIX aa: <http://www.city.ac.uk/ds/inm363/aaron_altrock#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

select ?domain_hash ?url_hash ?body_hash ?label where {
  ?domain_hash rdf:type aa:domainHash .
  ?url_hash rdf:type aa:urlHash .
  ?body_hash rdf:type aa:bodyHash .
  ?label rdf:type aa:newsLabel .
  ?url_hash aa:has_domain_hash ?domain_hash .
  ?url_hash aa:has_news_label ?label .
  ?url_hash aa:has_body_hash ?body_hash .
}
ORDER BY ?url_count
"""


sparql = SPARQLWrapper("http://35.246.120.165:7200/repositories/src_fake_news")
sparql.setReturnFormat(JSON)
sparql.setQuery(queryString)

try :
   res_dct = sparql.query().convert()
   print('OK')

except Exception as e:
   print('ERROR: {}'.format(e))


# Parse dict output from SPARQL to Pandas data frame
res_ls = res_dct.get('results').get('bindings')
domain_url_body_df = parse_to_dataframe(res_ls)

# Remove name space prefix
domain_url_body_df['domain_hash'] = domain_url_body_df['domain_hash'].map(lambda str: str[str.find('#') + 1:])
domain_url_body_df['url_hash'] = domain_url_body_df['url_hash'].map(lambda str: str[str.find('#') + 1:])
domain_url_body_df['body_hash'] = domain_url_body_df['body_hash'].map(lambda str: str[str.find('#') + 1:])
domain_url_body_df['label'] = domain_url_body_df['label'].map(lambda str: str[str.find('#') + 1:])

domain_url_body_df.head()

In [None]:
# Summarise the number of URLs with the same text body in news articles
reuse_content_url_df = domain_url_body_df[['body_hash', 'label', 'url_hash']].drop_duplicates().groupby(['body_hash', 'label']).count().reset_index().sort_values(by='url_hash', ascending=False).head(50)
reuse_content_url_df.to_csv('reuse_content_url_df.csv', index=False)
print('No. of distinct text corpora: {}'.format(reuse_content_df.shape[0]))
reuse_content_url_df.head()

In [None]:
print('No. of text bodies re-used: {}'.format(reuse_content_url_df[reuse_content_url_df['url_hash'] >= 2].shape[0]))

Therefore noted all articles were re-used up to 410 times.

In [None]:
# Summarise the number of domains with the same text body in news articles
reuse_content_domain_df = domain_url_body_df[['body_hash', 'label', 'domain_hash']].drop_duplicates().groupby(['body_hash', 'domain_hash']).count().reset_index().sort_values(by='domain_hash', ascending=False).head(50)
reuse_content_domain_df.to_csv('reuse_content_domain_df.csv', index=False)
print('No. of distinct text corpora: {}'.format(reuse_content_df.shape[0]))
reuse_content_domain_df.head()

In [None]:
# Find text corpora referenced by more than one domains
reuse_content_domain_df[reuse_content_domain_df['label'] > 1]

Noted therefore whilst there were repeated republication of the same news text corpora as the bodies they were confined to within the same web domains.

In [None]:
# BigQuery count by domain
query_job = client.query(
    """
    SELECT
    *
    FROM `detect-fake-news-313201.fake_news_sql.src_fake_news`
    WHERE BODY_HASH = 'body_7a03935701b11bf99ae50445a0e67793'
    """
)

res_df = query_job.result().to_dataframe()

In [None]:
res_df.head()

In [None]:
res_df['body'].iloc[0]