## Adding QA Checks
* If adding a new category:
  * Update the *cats* variable with the new report category. 
  * Add a Google Sheet with the same name [here](https://docs.google.com/spreadsheets/d/1rvKXdAPUl9wDDIC0NpwFZz5oC8bDBfLHD9nBnlbHTtw/edit#gid=2135584868)
* If adding a new qa check to an existing category:
  * Add a check in a new cell to the bottom of the group
  * Add link to CQA Jira Ticket in the comment where available
  * Ensure the report matches the defined format of all reports:
    * peopleid = H1 People ID
    * Category = cats variable value/google sheet for output
    * Check = String defining the check,i.e., Duplicate Twitter URLs
    * Flag = 1 for all people IDs that pass the check
* Output
  * The end of each section concatenates pandas dataframes from the section into 1 df
  * Update the pd.concat list for the section with the new report variable, i.e., n5

In [0]:
import jira as js
import pandas as pd
from datetime import date
from io import StringIO
as_of = date.today()
from email.message import EmailMessage
import smtplib
import mimetypes
#convert panda dataframes to spark dataframes
from pyspark.sql.types import *
import os
import ast

import pygsheets

In [0]:
cats = ["People - Name Checks",
        "People - Affiliations",
        "People - Emails and Social Media",
        "People - Dupes",
        "People - Registry",
        "People - Publications"]

# People - Name Checks

In [0]:
#https://h1insights.atlassian.net/wiki/spaces/H1D/pages/3632594979/People+-+Name+First+Name+Initial+with+NULL+or+Middle+Name+Initial
n1 = spark.sql(f"""
select ppm.peopleid,
"{cats[0]}" as Category,
"First Initial Middle Initial or Null" as Check,
1 as Flag
from tesseract_curie.People_latest ppm
 where --Names
((length(ppm.firstname)<2 OR right(ppm.firstname,1)='.')
AND (length(ppm.middlename)<2 OR right(ppm.middlename,1)='.' OR ppm.middlename is null)
)
""").toPandas()

#First name = last name
n2 = spark.sql(f"""
select ppm.peopleid,
"{cats[0]}" as Category,
"First Name Equals Full Name" as Check,
1 as Flag
from tesseract_curie.People_latest ppm
 where --Names
(ppm.fullname =ppm.firstname)
""").toPandas()

#Last Name is NUll
n3 = spark.sql(f"""
select ppm.peopleid,
"{cats[0]}"  as Category,
"Last Name is Null" as Check,
1 as Flag
from tesseract_curie.People_latest ppm
 where --Names
(ppm.lastname is null)
""").toPandas()

In [0]:
names_df= pd.concat([n1,n2,n3])

#People - Affiliations

In [0]:
#No affiliations but has publication or congress
n1 = spark.sql(f"""
    select 
                  ppm.peopleid,
                  "{cats[1]}" as Category,
                  case 
                  when  (spl.publicationCount>=1 
                        and size(spl.institutions)=0) 
                  then "No Affiliations Has Pubs" 
                  when  (ppm.ConfAppearancesCount>=1 
                      and size(spl.institutions)=0) 
                  then "No Affiliations Has Congress" end as Check,
                  1 as Flag
                  from tesseract_curie.People_latest ppm 
                  left join tesseract_curie.People_Institution_latest pipm 
                  on ppm.peopleid  = pipm.PeopleID 
                  left join curie.sliced_people_latest spl
                  on spl.id = ppm.peopleid
                  where 
                   ( (
                      --Pubs
                      (spl.publicationCount>=1)
                      --Congresses
                      OR 
                      (ppm.ConfAppearancesCount>=1))
                       and pipm.InstitutionID is null)
""").toPandas()

#Aff Type - Work Affiliation at Society or Journal
n2 = spark.sql(f"""
select i.people_id as peopleid,
      "{cats[1]}" as Category,
      i.master_organization_id,      
      case when  upper(il.type) =  'SOCIETIES-ASSOCIATIONS'
      then "Work Affiliation At Society" 
      when  upper(il.type) = 'JOURNAL'
      then "Work Affiliation At Journal" end as Check,
      1 as Flag
from curie.people_institution_latest i
join curie.people_institution_affiliation_latest a
on i.id = a.people_institution_id
join curie.institutions_latest il
on il.id = i.institution_id
where UPPER(a.affiliation_type) IN ('WORK AFFILIATION','NO AFFILIATION') AND UPPER(il.type) in ('SOCIETIES-ASSOCIATIONS','JOURNAL')
""").toPandas()

#Society Affiliation with no Work Affiliation
n3=spark.sql(f'''
SELECT i.people_id as peopleid,
       "{cats[1]}" as Category,
       "Society Or Journal Affiliation Only" as Check,
       1 as Flag
from curie.people_institution_latest i
join curie.people_institution_affiliation_latest a
on i.id = a.people_institution_id
join curie.institutions_latest il
on il.id = i.institution_id
GROUP BY i.people_id
HAVING SUM(CASE WHEN upper(a.affiliation_type)='WORK AFFILIATION'
                 AND upper(il.Type) not in ('SOCIETIES-ASSOCIATIONS','JOURNAL')
                THEN 1
                ELSE 0
                END)<1
       AND SUM(CASE WHEN upper(a.affiliation_type)='SOCIETY MEMBER'
                 AND upper(il.Type) in ('SOCIETIES-ASSOCIATIONS','JOURNAL')
                THEN 1
                ELSE 0
                END)>0
''').toPandas()

In [0]:
# Multiple Current Work Affiliations(>5)
# Business Owner: Ana Riquezes
# Data Assessment Owner: Anna Soloveva

# n4 = spark.sql(f'''
# SELECT ppm.PeopleID as peopleid, 
# "{cats[1]}" as Category, 
# "Multiple Current Work Affiliations(>5)" as Check, 
# 1 as Flag
# FROM tesseract_curie.people_latest ppm
# JOIN (SELECT i.PeopleID, count (distinct i.PeopleInstitutionID)
#            FROM tesseract_curie.people_institution_latest i
#            LEFT JOIN tesseract_curie.people_institution_affiliation_latest a ON a.ParentID = i.PeopleInstitutionID
#            WHERE i.IsCurrentAffiliation = 1 AND a.AffiliationType = 'Work Affiliation' AND i.source != 9
#            GROUP BY i.PeopleID
#            HAVING count (distinct i.PeopleInstitutionID) > 5)inst
# ON ppm.PeopleID = inst.PeopleID
# ''').toPandas()

In [0]:
ppl_aff_df=pd.concat([n1,n2,n3])

#People - Emails and Social Media

In [0]:
n1 = spark.sql(f'''
SELECT ppm.id as peopleid,
       "{cats[2]}" as Category,
       'Invalid Email' as Check,
       1 as Flag
 from curie.people_latest as ppm
 where emails not LIKE '%_@__%.__%' 
 AND emails != '[]'
 ''').toPandas()

In [0]:
n2 = spark.sql(f'''
SELECT ppm.id as peopleid,
       "{cats[2]}" as Category,
       'More than 3 Emails' as Check,
       1 as Flag
from curie.people_latest ppm
where (length(emails)-length(replace(emails,',','')))>3
 ''').toPandas()

In [0]:
emails_df=pd.concat([n1,n2])

#People - Dupes

In [0]:
#No affiliations but has publication or congress
n1 = spark.sql(f"""
   select ppm.id as peopleid,
       "{cats[3]}" as Category,
          "DuplicateLinkedIn" as Check,
          1 as Flag
     from curie.people_latest as ppm
  join (select linkedin_URL
             from curie.people_latest p
         group by linkedin_URL
           having count(distinct id)>1) l
       on l.linkedin_URL = ppm.linkedin_URL
       and l.linkedin_URL<>''
""").toPandas()

n2 = spark.sql(f'''
   select ppm.id as peopleid,
        "{cats[3]}" as Category,
          'DuplicateTwitter' as Check,
          1 as Flag
     from curie.people_latest as ppm
 join (select twitter_URL
             from curie.people_latest p
         group by twitter_URL
           having count(distinct id)>1) t
       on t.twitter_URL = ppm.twitter_URL
       and t.twitter_URL <>''
       ''').toPandas()

n3 = spark.sql(f'''
   select ppm.id as peopleid,
        "{cats[3]}" as Category,
          'DuplicateORCID' as Check,
          1 as Flag
     from curie.people_latest as ppm
     join (select orcid_id
             from curie.people_latest p
            where orcid_id != ''
         group by orcid_id
           having count(distinct id)>1) o
       on o.orcid_id = ppm.orcid_id''').toPandas()

In [0]:
emails = spark.sql('''
SELECT ppm.id as peopleid, emails
 from curie.people_latest as ppm
 where emails !='[]'
 ''').toPandas()
emails.head()

Unnamed: 0,peopleid,emails
0,118152,"[""timothy.kitchen@wcahospital.org""]"
1,238216,"[""lmcaleer-leavey@sentara.com""]"
2,290714,"[""zacharydreyfuss@mhd.com"",""zacharydreyfuss@te..."
3,487392,"[""kflores@cpgh.org""]"
4,577349,"[""amyburton@texashealth.org""]"


In [0]:
emails['emails'] = emails['emails'].apply(ast.literal_eval)
emails = emails.explode('emails')
tmp = emails.groupby('emails').peopleid.nunique().to_frame()
tmp = tmp[tmp.peopleid>1].index.tolist()

#Dupe Emails
dupe_emails = emails[emails.emails.isin(tmp)]
n4 = dupe_emails.copy()
n4.drop('emails',axis=1,inplace=True)
n4['Category'] =  cats[3]
n4['Check']='DuplicateEmail'
n4['Flag'] = 1
n4.drop_duplicates(inplace=True)

In [0]:
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
npi_pubs = spark.sql('''
  SELECT *,
         (target_pubs_overlap/source_pubcount)*100 as percentage_overlap
    FROM (
   SELECT a.people_id as sourceid, 
          p.first_name as source_fname,
          p.middle_name as source_mname,
          p.last_name as source_lname,
          p.designations as source_designation,
          spl.publicationCount as source_pubcount,
          b.people_id as target_id,
          p2.npi_number as sourcenpi,
          p2.first_name as target_fname,
          p2.middle_name as target_mname,
          p2.last_name as target_lname,
          p2.designations as target_designation,
          count(distinct b.publications_id) as target_pubs_overlap,
          row_number() over (partition by a.people_id order by count(distinct b.publications_id)/spl.publicationCount desc) as overlap_rank
     FROM (SELECT *
             FROM curie.people_latest pp
            WHERE pp.designations like '%MD%'
              AND pp.npi_number is null) AS p
     JOIN (SELECT *
             FROM curie.sliced_people_latest pl
            WHERE array_contains(pl.locations.country, 'United States') = TRUE
              AND pl.publicationCount>9) AS spl
       on p.id = spl.id
     JOIN curie.publication_authors_latest a 
       on a.people_id = p.id
     JOIN curie.publication_authors_latest b
       ON a.people_id <> b.people_id
      AND a.publications_id = b.publications_id
     JOIN curie.people_latest p2
       ON p2.id = b.people_id
      AND p2.npi_number is not null
      AND p2.npi_number != ''
      AND p2.designations like '%MD%'
 GROUP BY a.people_id, 
          p.first_name,
          p.middle_name,
          p.last_name,
          p.designations,
          spl.publicationCount,
          b.people_id,
          p2.npi_number,
          p2.first_name,
          p2.middle_name,
          p2.last_name,
          p2.designations ) A
    WHERE overlap_rank<6
      AND (target_pubs_overlap/source_pubcount)>.85
''').toPandas()

npi_pubs['source_fn'] = npi_pubs['source_fname']+' '+npi_pubs['source_mname']+' '+npi_pubs['source_lname']
npi_pubs['target_fn'] = npi_pubs['target_fname']+' '+npi_pubs['target_mname']+' '+npi_pubs['target_lname']
npi_pubs['name_match_score'] = npi_pubs.fillna('-').apply(lambda x: fuzz.partial_ratio(x['source_fn'].lower(), x['target_fn'].lower()), axis=1)
npi_pubs['Check']='DuplicateNPI_Overlap_Pubs'
npi_pubs['Flag'] = 1
n5 =pd.concat([npi_pubs[npi_pubs['name_match_score']>75][['sourceid','Check','Flag']],npi_pubs[npi_pubs['name_match_score']>75][['sourceid','Check','Flag']]])
n5['Category'] =  cats[3]


In [0]:
n5.rename(columns={'sourceid':'peopleid'},inplace=True)
ppl_dupe_df=pd.concat([n1,n2,n3,n4,n5])

## People - Registry

In [0]:
#People with no NPI in US with MD/NP
n1 = spark.sql(f'''
SELECT pp.id as  peopleid, 
        "{cats[4]}" as Category,
       'MD or NP in US without NPI' as Check,
       1 as Flag
 FROM curie.people_latest pp
WHERE (upper(pp.designations) like '%MD%' or upper(pp.designations) like '%NP%')
  AND pp.npi_number is null
  AND pp.id in (
select DISTINCT people_id
from (
select i.people_id
from curie.people_institution_latest i
join curie.people_institution_affiliation_latest a
on i.id = a.people_institution_id
join curie.people_institution_address_latest pad
on pad.people_institution_id = a.id
join curie.addresses_latest ad
on ad.id = pad.address_id
where a.end_date is null
AND UPPER(country) = "UNITED STATES"
AND upper(affiliation_type) in ("WORK AFFILIATION")
union
select i.people_id
from curie.people_institution_latest i
join curie.people_institution_affiliation_latest a
on i.id = a.people_institution_id
join curie.institution_address_latest id
on id.institution_id = i.institution_id
join curie.addresses_latest ad
on ad.id = id.address_id
where a.end_date is null
AND UPPER(country) = "UNITED STATES"
AND upper(affiliation_type) in ("WORK AFFILIATION")
) 
)
 ''').toPandas()

In [0]:
#has NPI but no US work affiliation
n2 = spark.sql(f'''
SELECT DISTINCT pp.id as  peopleid, 
        "{cats[4]}" as Category,
       'NPI without US Work Affiliation' as Check,
       1 as Flag
 FROM curie.people_latest pp
WHERE (upper(pp.designations) like '%MD%' or upper(pp.designations) like '%NP%')
  AND pp.npi_number is not null
  AND pp.id not in (
select DISTINCT people_id
from (
select i.people_id
from curie.people_institution_latest i
join curie.people_institution_affiliation_latest a
on i.id = a.people_institution_id
join curie.people_institution_address_latest pad
on pad.people_institution_id = a.id
join curie.addresses_latest ad
on ad.id = pad.address_id
where a.end_date is null
AND UPPER(country) = "UNITED STATES"
AND upper(affiliation_type) in ("WORK AFFILIATION")
union
select i.people_id
from curie.people_institution_latest i
join curie.people_institution_affiliation_latest a
on i.id = a.people_institution_id
join curie.institution_address_latest id
on id.institution_id = i.institution_id
join curie.addresses_latest ad
on ad.id = id.address_id
where a.end_date is null
AND UPPER(country) = "UNITED STATES"
AND upper(affiliation_type) in ("WORK AFFILIATION")
) 
)
 ''').toPandas()

In [0]:
ppl_reg=pd.concat([n1,n2])

# People - Publications

In [0]:
#People with no NPI in US with MD/NP
n1 = spark.sql(f'''
SELECT id as  peopleid, 
        "{cats[5]}" as Category,
       'Faculty Opinion Author - No Pubs' as Check,
       1 as Flag
from curie.sliced_people_latest as spl
WHERE publicationCount<1 AND id in (	select people_id
                    from curie.faculty_opinion_members_latest)''').toPandas()


#Over 1000 publications
n2 = spark.sql(f'''
SELECT id as  peopleid, 
        "{cats[5]}" as Category,
       'Over 1,000 Publications' as Check,
       1 as Flag
from curie.sliced_people_latest as spl
WHERE publicationCount>999''').toPandas()

In [0]:
# Business Owner: Arvind
# Data Assessment Owner: Anna Soloveva

# HCPs with publications from 1950 and before

n3 = spark.sql(f'''
SELECT distinct cpa.people_id as peopleid, 
        "{cats[5]}" as Category,
       'Publications before 1950' as Check,
       1 as Flag
from curie.publication_authors_latest  cpa
left join curie.publications_latest cp on cp.id = cpa.publications_id
where cp.date_published < '-599616000' ''').toPandas()

# Publications > 100 in a given year

n4 = spark.sql(f'''
SELECT distinct cpa.people_id as peopleid, 
        "{cats[5]}" as Category,
       'Publications > 100 in a given year' as Check,
       1 as Flag
from curie.publication_authors_latest  cpa
join curie.publications_latest cp on cp.id = cpa.publications_id 
group by cpa.people_id, from_unixtime(cp.date_published,'yyyy')
having count(distinct cp.id) > 100 ''').toPandas()

In [0]:
ppl_pubs=pd.concat([n1,n2,n3,n4])

## Concatenate All Outputs

In [0]:
final_df = pd.concat([names_df,
                     ppl_aff_df,
                     emails_df,
                     ppl_dupe_df,
                     ppl_reg,
                     ppl_pubs])
final_df.head()

## Save Results to Table

In [0]:
final_agg = final_df.groupby(['peopleid','Category','Check']).Flag.sum().unstack().fillna(0)
final_agg[final_agg.columns] = final_agg[final_agg.columns].astype('int32')
final_agg.sort_index(axis=1,inplace=True)
final_agg['Date']=as_of
final_agg = final_agg.groupby('Date').sum()
final_agg.head()

#jira creds
JIRA_API_TOKEN = 'rctxzsxvevDVfZPFR8FM0922'
QA_jira = js.JIRA('https://h1insights.atlassian.net',
                  basic_auth=('drake.bushnell@h1.co',
                  JIRA_API_TOKEN))

final_agg.select(col("DuplicateEmail")).show()

QA_jira.add_comment('CQA-950',f"""{as_of} Update:

* Duplicate Emails: {final_agg.toPandas().DuplicateEmail.nunique():,}
""")

In [0]:
run_flag =0
df_agg = final_agg.stack().reset_index().rename(columns={0:'Count'})
if run_flag==0:
  #load old data
  full_df = spark.sql(f'''
  select *
  from tableau.data_qa_weekly''')
  #stats.createOrReplaceTempView("stats")
  full_df=full_df.toPandas()

  #concat
  df_agg = pd.concat([full_df,df_agg]).drop_duplicates()
  df_agg.sort_index(axis=1,inplace=True)
  





In [0]:
%sql
DROP TABLE IF EXISTS tableau.data_qa_weekly

In [0]:
#database for tableau
tbl = spark.createDataFrame(df_agg)
tbl.write.mode('overwrite').saveAsTable(f'tableau.data_qa_weekly')

### Calculate aggregates

In [0]:
out = df_agg.groupby(['Date','Check']).Count.min().unstack()

In [0]:
gc = pygsheets.authorize(service_file='/dbfs/FileStore/tables/coverage/sheets.json')
grpt = gc.open_by_key('1rvKXdAPUl9wDDIC0NpwFZz5oC8bDBfLHD9nBnlbHTtw')



In [0]:
for i in range(0,len(cats)):
  cols = final_df[final_df.Category==cats[i]].Check.unique().tolist()
  tmp = df_agg[df_agg['Check'].isin(cols)].groupby(['Date','Check']).Count.min().unstack().sort_index(ascending=False).reset_index()
  sh = grpt.worksheet_by_title(cats[i])
  sh.clear()
  sh.set_dataframe(tmp.sort_values("Date",ascending=False),(1,1))

## Create Master Report for Data Analysis

In [0]:
rpt = final_df[[col for col in final_df.columns.tolist() if col != 'master_organization_id']].drop_duplicates().groupby(['peopleid','Check'])[[col for col in final_df.columns.tolist() if col != 'master_organization_id']].sum().unstack().fillna(0)
rpt =rpt.droplevel(level=0,axis=1)
rpt['Total Violations'] = rpt.sum(axis=1)
rpt.sort_values(['Total Violations'],inplace=True,ascending=False)
rpt.head()

In [0]:
#send to Gsheets
sh = grpt.worksheet_by_title("IDs")
sh.clear()
sh.set_dataframe(rpt.reset_index(),(1,1),)

# Share results to Slack

In [0]:
#sending out the email to slack

message = EmailMessage()

#set up sender & recipient
sender = "drake.bushnell@h1.co"
recipient = 'data-qa-one-team-aaaafheyazufnbfpuxqelx6ayi@h1insights.slack.com'
message['From'] = sender
message['To'] = recipient

#set up subject
message['Subject'] = f'QA - Weekly Report - {as_of}'

#body
body = f'''Access the QA report here: https://docs.google.com/spreadsheets/d/1rvKXdAPUl9wDDIC0NpwFZz5oC8bDBfLHD9nBnlbHTtw/edit?usp=sharing

'''
message.set_content(body)




#mail server config
mail_server = smtplib.SMTP_SSL('smtp.gmail.com')
mail_server.set_debuglevel(1)
mail_server.login("drake.bushnell@h1.co", "cfvamzzezgbdztaz")
mail_server.send_message(message)
mail_server.quit()