# Primeira etapa do desafio

1.	Realize a extração dos dados dos 3 ultimos meses de prescrição (english-prescribing-data-epd) sem contar o ultimo, fonte: https://opendata.nhsbsa.net/dataset/english-prescribing-data-epd

## Mês de Junho
- Por se tratar de uma grande massa de dados, optei por fazer o processamento individualmente por mês em cada notebook.
- Tambem por esse motivo optei por fazer todo o armazemento e excução na minha maquina local, para evitar problemas de custos extras pelo volume de dados armazenados e processados

---

### Fazendo as importações dos frameworks/metodos necessarios para executar as funções do jupyter 

In [1]:
import pandas as pd
import pyspark.sql.functions as func

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'E:\\spark-3.3.1-bin-hadoop3'

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
spark = SparkSession.builder.getOrCreate()

### Criando o DF principal:
- Aqui optei por deixar comentado a amostra da visualização dos dados no Spark, pois pelo tamanho massivo do DF, a representação não estava satisfatoria ao meu ver. Mais a baixo ultilizarei o pandas como alternativa.

In [5]:
df_spark = spark.read.csv(r'C:\Users\arthu\Desktop\EPD_202206.csv', sep = ',', inferSchema=True, header=True)
# df_spark.show(10)


### Conseguindo a importação pela API
- Alternativa de importação dos dados usando API, pois a fornecida pelo site não foi compativel para mim.


In [6]:
#import urllib.request

#url = 'https://opendata.nhsbsa.net/dataset/65050ec0-5abd-48ce-989d-defc08ed837e/resource/bc3cb910-f841-47e0-9fb6-203d141c57d6/download/epd_202208.csv'  
#fileobj = urllib.request.urlopen(url)

#for linha in fileobj:
#    print(linha)
#    break

### Obtendo as informações necessarias da importação realizada :
- Concluindo a validação dos dados extraídos

In [7]:
print("Total de linhas: ", df_spark.count(),"\n" # Conferindo se todas as linhas foram importadas por completo
      "Total de colunas: ", len(df_spark.columns)) # Numero de colunas
df_spark.printSchema() # Conferindo o nome das colunas para consulta

Total de linhas:  17603900 
Total de colunas:  26
root
 |-- YEAR_MONTH: integer (nullable = true)
 |-- REGIONAL_OFFICE_NAME: string (nullable = true)
 |-- REGIONAL_OFFICE_CODE: string (nullable = true)
 |-- ICB_NAME: string (nullable = true)
 |-- ICB_CODE: string (nullable = true)
 |-- PCO_NAME: string (nullable = true)
 |-- PCO_CODE: string (nullable = true)
 |-- PRACTICE_NAME: string (nullable = true)
 |-- PRACTICE_CODE: string (nullable = true)
 |-- ADDRESS_1: string (nullable = true)
 |-- ADDRESS_2: string (nullable = true)
 |-- ADDRESS_3: string (nullable = true)
 |-- ADDRESS_4: string (nullable = true)
 |-- POSTCODE: string (nullable = true)
 |-- BNF_CHEMICAL_SUBSTANCE: string (nullable = true)
 |-- CHEMICAL_SUBSTANCE_BNF_DESCR: string (nullable = true)
 |-- BNF_CODE: string (nullable = true)
 |-- BNF_DESCRIPTION: string (nullable = true)
 |-- BNF_CHAPTER_PLUS_CODE: string (nullable = true)
 |-- QUANTITY: double (nullable = true)
 |-- ITEMS: integer (nullable = true)
 |-- TOTAL_Q

## Começando os tratamentos:
## 4.	Após a coleta dos dados, separe os dados entre prescribers e prescriptions.

### Prescribers / Prescritores
- Separando o df_spark em Prescribers e as colunas que considerei está relacionado na leitura da documentação.
- Como ultilizarei o pandas e estava tendo problemas com a memoria para processamento, limitei apenas para uma amostra de 150000 linhas
> Prescribers (Prescritores) = PRACTICE_NAME

In [8]:
df_prescribers = df_spark.select("YEAR_MONTH",
                                 "PRACTICE_NAME",
                                 "REGIONAL_OFFICE_NAME",
                                 "ICB_NAME",
                                 "PCO_NAME"
# Aqui tive que optar com limitar a quantidade de linhas, pois ao ultilizar o pandas para salvar o df estava estourando a memoria                                 
                                 ).limit(150000)

df_prescribers.show(50)

+----------+--------------------+--------------------+--------------------+--------------------+
|YEAR_MONTH|       PRACTICE_NAME|REGIONAL_OFFICE_NAME|            ICB_NAME|            PCO_NAME|
+----------+--------------------+--------------------+--------------------+--------------------+
|    202206|WIRRAL COMMUNITY NMP|          NORTH WEST|NHS CHESHIRE AND ...|WIRRAL COMMUNITY ...|
|    202206|WIRRAL WIC (APH)_...|          NORTH WEST|NHS CHESHIRE AND ...|WIRRAL COMMUNITY ...|
|    202206|BASSETLAW HEALTH ...|NORTH EAST AND YO...|NHS SOUTH YORKSHI...|NHS NOTTINGHAM AN...|
|    202206|BASSETLAW HEALTH ...|NORTH EAST AND YO...|NHS SOUTH YORKSHI...|NHS NOTTINGHAM AN...|
|    202206|BASSETLAW HEALTH ...|NORTH EAST AND YO...|NHS SOUTH YORKSHI...|NHS NOTTINGHAM AN...|
|    202206|BASSETLAW HEALTH ...|NORTH EAST AND YO...|NHS SOUTH YORKSHI...|NHS NOTTINGHAM AN...|
|    202206|BASSETLAW HEALTH ...|NORTH EAST AND YO...|NHS SOUTH YORKSHI...|NHS NOTTINGHAM AN...|
|    202206|BASSETLAW HEALTH .

### Prescriptions / Prescrições
- Separando o df_spark em Prescriptions e as colunas que considerei está relacionado na leitura da documentação.
- Amostra limitada 200000 linhas.
> Prescriptions (Prescrições) = BNF_DESCRIPTION

In [9]:
df_prescriptions = df_spark.select("YEAR_MONTH",
                                   "BNF_DESCRIPTION",
                                   "CHEMICAL_SUBSTANCE_BNF_DESCR",
                                   "QUANTITY",
                                   "ITEMS",
                                   "TOTAL_QUANTITY",
                                   "NIC",
                                   "ACTUAL_COST"
# Mais uma vez limitando a quantidade de linhas por conta da memoria
                                   ).limit(200000)


df_prescriptions.show(50)

+----------+--------------------+----------------------------+--------+-----+--------------+------+-----------+
|YEAR_MONTH|     BNF_DESCRIPTION|CHEMICAL_SUBSTANCE_BNF_DESCR|QUANTITY|ITEMS|TOTAL_QUANTITY|   NIC|ACTUAL_COST|
+----------+--------------------+----------------------------+--------+-----+--------------+------+-----------+
|    202206|Viscopaste PB7 ba...|          Arm Sling/Bandages|    10.0|    1|          10.0|  38.9|   36.40326|
|    202206|Mepore dressing 1...|        Wound Management ...|     5.0|    1|           5.0|  1.85|    1.74307|
|    202206|Dressit sterile d...|        Wound Management ...|    10.0|    6|          60.0|  41.4|    38.7544|
|    202206|Dressit sterile d...|        Wound Management ...|    20.0|    2|          40.0|  27.6|   25.81973|
|    202206|Allevyn Adhesive ...|        Wound Management ...|    10.0|    2|          20.0|  46.6|   43.60659|
|    202206|Biatain Adhesive ...|        Wound Management ...|    30.0|    1|          30.0|  81.6|   76

#### Extra:
- Fazendo o fatiamento da coluna 
> YEAR_MONTH em YEAR e MONTH
- Posteriormente vi que não seria necessario a ultilização.


In [10]:
df_YEAR_MONTH_split = df_prescriptions.withColumn('YEAR', func.substring('YEAR_MONTH',1,4))\
.withColumn('MONTH',func.substring('YEAR_MONTH',5,2))

df_YEAR_MONTH_split.show()

+----------+--------------------+----------------------------+--------+-----+--------------+------+-----------+----+-----+
|YEAR_MONTH|     BNF_DESCRIPTION|CHEMICAL_SUBSTANCE_BNF_DESCR|QUANTITY|ITEMS|TOTAL_QUANTITY|   NIC|ACTUAL_COST|YEAR|MONTH|
+----------+--------------------+----------------------------+--------+-----+--------------+------+-----------+----+-----+
|    202206|Viscopaste PB7 ba...|          Arm Sling/Bandages|    10.0|    1|          10.0|  38.9|   36.40326|2022|   06|
|    202206|Mepore dressing 1...|        Wound Management ...|     5.0|    1|           5.0|  1.85|    1.74307|2022|   06|
|    202206|Dressit sterile d...|        Wound Management ...|    10.0|    6|          60.0|  41.4|    38.7544|2022|   06|
|    202206|Dressit sterile d...|        Wound Management ...|    20.0|    2|          40.0|  27.6|   25.81973|2022|   06|
|    202206|Allevyn Adhesive ...|        Wound Management ...|    10.0|    2|          20.0|  46.6|   43.60659|2022|   06|
|    202206|Biat

## 5. Persista os dados da forma que achar melhor:
- Dados persistidos no formado CSV

### Usando o pandas para salvar os DFs de 'Prescriptions' e 'Prescribers'

In [11]:
df_prescriptions.toPandas().to_csv('Warehouse/Mes_06/df_prescriptions.csv', index=False)

df_prescribers.toPandas().to_csv('Warehouse/Mes_06/df_prescribers.csv', index=False)

### Conferindo se os arquivos foram corretamente exportados e armazenados 


In [12]:
# Lendo a amostra dos arquivos 
df_check_prescriptions = spark.read.csv(r'Warehouse/Mes_07/df_prescriptions.csv', sep = ',', inferSchema=True, header=True)

df_check_prescriptions.show()

+----------+--------------------+----------------------------+--------+-----+--------------+------+-----------+
|YEAR_MONTH|     BNF_DESCRIPTION|CHEMICAL_SUBSTANCE_BNF_DESCR|QUANTITY|ITEMS|TOTAL_QUANTITY|   NIC|ACTUAL_COST|
+----------+--------------------+----------------------------+--------+-----+--------------+------+-----------+
|    202207|Simpla G-Strap adult|        Tubing And Access...|     5.0|    1|           5.0| 15.23|   14.24062|
|    202207|Salts adhesive re...|        Adhesive Removers...|   100.0|    7|         700.0| 97.86|   91.50275|
|    202207|Salts adhesive re...|        Adhesive Removers...|   350.0|    1|         350.0| 48.93|   45.75137|
|    202207|Salts adhesive re...|        Adhesive Removers...|   150.0|    1|         150.0| 20.97|   19.60773|
|    202207|Salts adhesive re...|        Adhesive Removers...|    50.0|    1|          50.0|  6.99|    6.53591|
|    202207|Salts adhesive re...|        Adhesive Removers...|   200.0|    2|         400.0| 55.92|   52

In [13]:
# Lendo a amostra dos arquivos
df_check_prescribers = spark.read.csv(r'Warehouse/Mes_07/df_prescribers.csv', sep = ',', inferSchema=True, header=True)

df_check_prescribers.show()

+----------+-----------------+--------------------+--------------------+--------------------+
|YEAR_MONTH|    PRACTICE_NAME|REGIONAL_OFFICE_NAME|            ICB_NAME|            PCO_NAME|
+----------+-----------------+--------------------+--------------------+--------------------+
|    202207|RIPON SPA SURGERY|NORTH EAST AND YO...|NHS HUMBER AND NO...|NHS HUMBER AND NO...|
|    202207|RIPON SPA SURGERY|NORTH EAST AND YO...|NHS HUMBER AND NO...|NHS HUMBER AND NO...|
|    202207|RIPON SPA SURGERY|NORTH EAST AND YO...|NHS HUMBER AND NO...|NHS HUMBER AND NO...|
|    202207|RIPON SPA SURGERY|NORTH EAST AND YO...|NHS HUMBER AND NO...|NHS HUMBER AND NO...|
|    202207|RIPON SPA SURGERY|NORTH EAST AND YO...|NHS HUMBER AND NO...|NHS HUMBER AND NO...|
|    202207|RIPON SPA SURGERY|NORTH EAST AND YO...|NHS HUMBER AND NO...|NHS HUMBER AND NO...|
|    202207|RIPON SPA SURGERY|NORTH EAST AND YO...|NHS HUMBER AND NO...|NHS HUMBER AND NO...|
|    202207|RIPON SPA SURGERY|NORTH EAST AND YO...|NHS HUMBE

## 6. Gere scripts que atendam as solicitações abaixo:

- Aqui optei por ultilizar SQL por ter maior dominio, ao invez de mesclar com o PySpark.

#### Criando a View que ultilizarei nas consultas SQL:

In [14]:
df_spark.createOrReplaceTempView("query")

### 1º Query:
- a. Crie um dataframe contendo os 10 principais produtos químicos prescritos por região.

In [15]:
df_query1_sql = spark.sql("""
with quimico as(
    SELECT 
        REGIONAL_OFFICE_NAME as Regiao,
        CHEMICAL_SUBSTANCE_BNF_DESCR as Substancia,
        sum(TOTAL_QUANTITY) as Total
    FROM query
    GROUP BY 1,2),
top10 as (
    select *,
    row_number() over (partition by Regiao order by Total desc, Substancia ) as rank
    from quimico)

select Rank,
       Regiao,
       Substancia,
       Total
from top10
where rank <= 10
order by Regiao, Total desc;
""")


#### Setando valores float sem notação científica
- Por padrão o pandas vem com notação cientifica. Para uma melhor visualização, optei por alterar

In [16]:
pd.options.display.float_format = '{:.2f}'.format

### Extra:
> Como já informei anteriormente, não gosto da visualização com o pyspark, por ser tratar de um DF relativamente pequeno, optei por utilizar o pandas.

Mas para esse caso achei interessante mostrar todas as linhas com o spark para observer a coluna "Rank"


In [17]:
# Conta todas as linhas contidas na query
all_lines = df_query1_sql.count() 

# Imprime as linhas resultantes
df_query1_sql.show(all_lines) 

+----+--------------------+--------------------+------------+
|Rank|              Regiao|          Substancia|       Total|
+----+--------------------+--------------------+------------+
|   1|     EAST OF ENGLAND|   Enteral nutrition|1.93741913E8|
|   2|     EAST OF ENGLAND|          Emollients| 4.1260874E7|
|   3|     EAST OF ENGLAND|Other food for sp...| 2.5345743E7|
|   4|     EAST OF ENGLAND|Metformin hydroch...| 1.9476258E7|
|   5|     EAST OF ENGLAND|         Paracetamol| 1.9237946E7|
|   6|     EAST OF ENGLAND|Other emollient p...| 1.7486945E7|
|   7|     EAST OF ENGLAND|        Atorvastatin| 1.7441278E7|
|   8|     EAST OF ENGLAND|Co-codamol (Codei...| 1.5366863E7|
|   9|     EAST OF ENGLAND|Alginic acid comp...| 1.3453583E7|
|  10|     EAST OF ENGLAND|           Lactulose| 1.2858787E7|
|   1|              LONDON|   Enteral nutrition|2.90916061E8|
|   2|              LONDON|          Emollients| 7.2505881E7|
|   3|              LONDON|Other emollient p...| 3.2726201E7|
|   4|  

#### Retirando o limitador maximo de linhas (Doido pra quebrar  o pandas kkkk)
- pd.set_option('display.max_rows', None)

#### Não fico muito interessante no github ( Muitas linhas para descer, então resetei para o padrão kkk)
- pd.reset_option('^display.', silent=True)

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento
- No resultado da query optei por mostrar um "Rank" para oferecer mais clareza nos dados coletados

In [18]:
df_query1_sql.toPandas().to_csv('Warehouse/Mes_06/df_query1_sql.csv', index=False)
pd.read_csv('Warehouse/Mes_06/df_query1_sql.csv')

Unnamed: 0,Rank,Regiao,Substancia,Total
0,1,EAST OF ENGLAND,Enteral nutrition,193741913.00
1,2,EAST OF ENGLAND,Emollients,41260874.00
2,3,EAST OF ENGLAND,Other food for special diet preparations,25345743.00
3,4,EAST OF ENGLAND,Metformin hydrochloride,19476258.00
4,5,EAST OF ENGLAND,Paracetamol,19237946.00
...,...,...,...,...
75,6,UNIDENTIFIED,Amoxicillin,67636.00
76,7,UNIDENTIFIED,Other emollient preparations,63975.00
77,8,UNIDENTIFIED,Catheters,47074.00
78,9,UNIDENTIFIED,Co-codamol (Codeine phosphate/paracetamol),43148.00


### 2º query
- b. Quais produtos químicos prescritos tiveram a maior somatória de custos por mês? Liste os 10 primeiros.
- Os custos são dados em Libras esterlinas (£)
- Com mais tempo seria interessante implementa um web scraping para converter em R$ pela cotação do dia.

In [19]:
df_query2_sql = spark.sql("""
with principal as(
    SELECT 
        YEAR_MONTH as ano_mes,
        CHEMICAL_SUBSTANCE_BNF_DESCR as Substancia,
        round(sum(NIC),2) as Total
    FROM query
    GROUP BY 1,2),
top10 as (
    select *,
    row_number() over (partition by ano_mes order by Total desc, Substancia ) as rank
    from principal)

select Rank,
       ano_mes,
       Substancia,
       Total
from top10
where rank <= 10
order by ano_mes, Total desc;
""")

In [20]:
# Mostrando como seria a visualização pelo PySpark
#df_query2_sql.show()

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento

In [21]:
df_query2_sql.toPandas().to_csv('Warehouse/Mes_06/df_query2_sql.csv', index=False)
pd.read_csv('df_query2_sql.csv')

Unnamed: 0,Rank,ano_mes,Substancia,Total
0,1,202208,Apixaban,36826136.54
1,2,202208,Enteral nutrition,27234545.33
2,3,202208,Beclometasone dipropionate,25686965.16
3,4,202208,Rivaroxaban,20612972.7
4,5,202208,Catheters,13313142.74
5,6,202208,Detection Sensor Interstitial Fluid/Gluc,12810018.1
6,7,202208,Edoxaban,11134728.5
7,8,202208,Dapagliflozin,10963750.15
8,9,202208,Wound Management & Other Dressings,10731790.36
9,10,202208,Budesonide,10586317.29


### 3º Query
- c. Quais são as precrições mais comuns?
- Como o desafio não especifica um limite, opto por uma amostra listando as 100 primeiras.

In [22]:
df_query3_sql = spark.sql("""
SELECT
    BNF_DESCRIPTION as Prescricao,
    sum(ITEMS) as Quantidade
FROM query
GROUP BY 1 
order by 2 desc
limit 100;
--Caso necessite uma amostra maior é só ajustar o limit
""")

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento

In [23]:
df_query3_sql.toPandas().to_csv('Warehouse/Mes_06/df_query3_sql.csv', index=False)
pd.read_csv('Warehouse/Mes_08/df_query3_sql.csv')

Unnamed: 0,Prescricao,Quantidade
0,Omeprazole 20mg gastro-resistant capsules,2659334
1,Atorvastatin 20mg tablets,2349300
2,Amlodipine 5mg tablets,1748969
3,Lansoprazole 30mg gastro-resistant capsules,1633842
4,Atorvastatin 40mg tablets,1309360
...,...,...
95,Diazepam 2mg tablets,196878
96,Candesartan 8mg tablets,192005
97,Mometasone 50micrograms/dose nasal spray,190200
98,Docusate 100mg capsules,189553


### 4º query
- d. Qual produto químico é mais prescrito por cada prescriber?


In [24]:
df_query4_sql = spark.sql("""
with test as(
    SELECT 
        PRACTICE_NAME as Prescriber,
        CHEMICAL_SUBSTANCE_BNF_DESCR as Substancia,
        sum(ITEMS) as Quantidade
    FROM query
    GROUP BY 1,2),
top1 as (
    select *,
    row_number() over (partition by prescriber order by Quantidade desc, Substancia ) as qtd
    from test)

select Prescriber,
       Substancia,
       Quantidade
from top1
where qtd = 1
order by prescriber, Quantidade desc;

""")

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento

In [25]:
df_query4_sql.toPandas().to_csv('Warehouse/Mes_06/df_query4_sql.csv', index=False)
pd.read_csv('Warehouse/Mes_06/df_query4_sql.csv')

Unnamed: 0,Prescriber,Substancia,Quantidade
0,(FRACTURE CLINIC) NORTH OOH,Amoxicillin,423
1,(IRLAM) SALFORD CARE CTRS MEDICAL PRACTI,Atorvastatin,299
2,(OUT PATIENT DEPARTMENT) NORTH OOH,Amisulpride,1
3,0-19 EAST CHESHIRE HEALTH VISITORS,Other barrier preparations,1
4,0-19 PUBLIC HEALTH SERVICE HARTLEPOOL,Clotrimazole,1
...,...,...,...
8382,YOUR HEALTHCARE NON MED PRES,Wound Management & Other Dressings,119
8383,YOXALL,Atorvastatin,543
8384,ZAIN MEDICAL CENTRE,Metformin hydrochloride,139
8385,ZAMAN,Atorvastatin,576


<a name="ir"></a>
### 5º Query
e. Quantos prescribers foram adicionados no ultimo mês? 

## Por ultilizar um metodo diferente, optei por deixar para o final...


<a name="voltar"></a>
### 6º query
- f. Quais prescribers atuam em mais de uma região? Ordene por quantidade de regiões antendidas.

- Aqui acrescentei qual seria a região mais atendida.
- Pode ser ajustado no 'where top = "n" ' para apresentar em sequencia qual seria a 2º,3º,etc.
- Retirei os UNIDENTIFIED DOCTORS, por se tratar de um dado impreciso e ser o que tinha o maior numero de regiões (8 ao total).

In [26]:
df_query6_sql = spark.sql(
    """with regiao as ( 
    SELECT
        PRACTICE_NAME as Prescribers,
        REGIONAL_OFFICE_NAME as Regiao_principal
    FROM query
    group by 1,2),
filtro as(
    select *,
    row_number() over (partition by Prescribers order by Regiao_principal) as Qtd_Regioes,
    rank() over (partition by Prescribers order by Regiao_principal desc) as top 
    from regiao)

select Prescribers,
       Regiao_principal,
       Qtd_Regioes
from filtro
where top = 1 and Qtd_Regioes >= 2 and Prescribers not in ('UNIDENTIFIED DOCTORS')
order by Qtd_Regioes desc, Prescribers;
""")

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento

In [27]:
df_query6_sql.toPandas().to_csv('Warehouse/Mes_06/df_query6_sql.csv', index=False)
pd.read_csv('Warehouse/Mes_06/df_query6_sql.csv')

Unnamed: 0,Prescribers,Regiao_principal,Qtd_Regioes
0,VILLAGE SURGERY,SOUTH WEST,7
1,HIGH STREET SURGERY,SOUTH EAST,6
2,CENTRAL SURGERY,NORTH WEST,5
3,COMMUNITY DERMATOLOGY SERVICE,SOUTH WEST,5
4,RIVERSIDE SURGERY,SOUTH EAST,5
...,...,...,...
212,WILLOW TREE SURGERY,LONDON,2
213,WINDMILL SURGERY,MIDLANDS,2
214,WOODLANDS HEALTH CENTRE,SOUTH EAST,2
215,WOODSIDE MEDICAL CENTRE,NORTH WEST,2


### 7º query
- g. Qual o preço médio dos químicos prescritos em no ultimo mês coletado?
- Aqui por ter sido um query mais simples, optei por acrescentar algumas informações extras como:
- Os custos são dados em Libras esterlinas (£)

1. Valor_Medio_liquido = Valor Com descontos totais 
2. Maior_valor = O maior valor encontrado do quimico
3. Menor_valor = O menor valor encontrado do quimico



In [28]:
df_query7_sql = spark.sql("""
SELECT 
    CHEMICAL_SUBSTANCE_BNF_DESCR as Substancia,
    round(avg(NIC),2) as Valor_Medio_bruto,
    round(avg(ACTUAL_COST),2) as Valor_Medio_liquido,
    round(max(ACTUAL_COST),2) as Maior_valor,
    round(min(ACTUAL_COST),2) as Menor_valor
FROM query
where CHEMICAL_SUBSTANCE_BNF_DESCR != ""
GROUP BY 1
order by 1,4 desc
""")

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento

In [29]:
# Exportando o resultado para um csv
df_query7_sql.toPandas().to_csv('Warehouse/Mes_06/df_query7_sql.csv', index=False)
pd.read_csv('Warehouse/Mes_06/df_query7_sql.csv')

Unnamed: 0,Substancia,Valor_Medio_bruto,Valor_Medio_liquido,Maior_valor,Menor_valor
0,Abatacept,1965.60,1838.88,3394.75,283.01
1,Abiraterone,683.75,639.76,639.76,639.76
2,Absorbent Cottons,10.63,9.96,52.83,0.78
3,Acamprosate calcium,39.46,37.10,1395.13,0.25
4,Acarbose,28.23,26.52,284.63,0.57
...,...,...,...,...,...
1437,Zonisamide,95.22,89.16,17820.83,0.57
1438,Zopiclone,3.78,3.95,311.84,0.03
1439,Zuclopenthixol acetate,45.19,42.32,108.83,4.53
1440,Zuclopenthixol decanoate,25.96,24.37,347.88,2.95


### 8º query
- h. Gere uma tabela que contenha apenas a prescrição de maior valor de cada "usuário".
- P.s.: Aqui interpletei 
    > Usuario > Prescriber

In [30]:
df_query8_sql = spark.sql("""
with valor as(
    SELECT 
        PCO_NAME as Prescriber,
        CHEMICAL_SUBSTANCE_BNF_DESCR as Substancia,
        round(max(ACTUAL_COST),2) as Maior_valor
    FROM query
    GROUP BY 1,2),
top1 as (
    select *,
    row_number() over (partition by prescriber order by Maior_valor desc, Substancia ) as qtd
    from valor )

select Prescriber,
       Substancia,
       Maior_valor
from top1
where qtd = 1
order by prescriber, Maior_valor desc;
--order by Maior_valor desc, prescriber; só para reordenar por maior valor
""")

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento


In [31]:
df_query8_sql.toPandas().to_csv('Warehouse/Mes_06/df_query8_sql.csv', index=False)
pd.read_csv('Warehouse/Mes_06/df_query8_sql.csv')

Unnamed: 0,Prescriber,Substancia,Maior_valor
0,ABOUT HEALTH,Isotretinoin,2558.95
1,AIREDALE NHS FOUNDATION TRUST,Wound Management & Other Dressings,231.44
2,BERKSHIRE HEALTHCARE NHS FOUNDATION TRUS,Anal Irrigation System,3886.15
3,BIRMINGHAM COMMUNITY HEALTHCARE NHS FOUN,Melatonin,15821.88
4,BIRMINGHAM WOMEN'S AND CHILDREN'S NHS FO,Lisdexamfetamine dimesylate,599.45
...,...,...,...
304,WIRRAL COMMUNITY HEALTH AND CARE NHS FOU,Sacubitril/valsartan,2227.33
305,"WRIGHTINGTON, WIGAN AND LEIGH NHS FOUNDA",Goserelin acetate,1978.69
306,WYE VALLEY NHS TRUST,Wound Management & Other Dressings,1129.20
307,YORK AND SCARBOROUGH TEACHING HOSPITALS,Sacubitril/valsartan,257.00


---
# Aqui indico verificar o notebook 'Rotina_mensal' para entender melhor o que foi informado a respeito da extração das amostras utilizadas aqui.
- [Abrir Rotina_mensal](Rodina_Mensal.ipynb)
---

<a name="query5"></a>
## Retomando a 5º Query
- e. Quantos Prescribers foram adicionados no ultimo mês? 


### Com os arquivos já extraidos seguimos da seguinte formar:
- Criação de 2 DFs referentes a cada mês (agosto e junho)

In [32]:
df_amostra_ultimo = spark.read.csv(r'C:\Users\arthu\Desktop\EPD_202207-00.csv', sep = ',', inferSchema=True, header=True)
df_amostra_anterior = spark.read.csv(r'C:\Users\arthu\Desktop\EPD_202206-00.csv', sep = ',', inferSchema=True, header=True)

#### Filtro as colunas que julgo apenas necessarias:
- YEAR_MONTH
- PRACTICE_NAME

In [33]:
df_filtro_1 = df_amostra_anterior.select('YEAR_MONTH','PRACTICE_NAME')
df_filtro_2 = df_amostra_ultimo.select('YEAR_MONTH','PRACTICE_NAME')

#### Faço a união dos valores distindos de cada DF:
- Dessa forma serão unidos apenas os valores unicos referentes a cada mês.

In [34]:
add_no_mes = df_filtro_1.union(df_filtro_2).distinct()

#### Criando a View SQL para proxima consulta:

In [35]:
add_no_mes.createOrReplaceTempView("query_amostra")

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento

In [36]:
add_no_mes.toPandas().to_csv('Warehouse/Mes_06/add_no_mes.csv', index=False)
add_no_mes = pd.read_csv(r'Warehouse/Mes_06/add_no_mes.csv')
add_no_mes

Unnamed: 0,YEAR_MONTH,PRACTICE_NAME
0,202206,"DR PUZEY,DR KOTHARI AND DR NANDA"
1,202206,ISLE OF WIGHT NHS TRUST NMP
2,202206,URGENT TREATMENT CENTRE/MINOR INJURIES
3,202206,FRIMLEY - COMMUNITY DERMATOLOGY SERVICE
4,202206,BASSET LAW DERMATOLOGY SERVICE
...,...,...
15176,202207,RISEDALE SURGERY
15177,202207,RICKLETON MEDICAL CENTRE
15178,202207,WESTCROFT HOUSE SURGERY
15179,202207,BISHOP AUCKLAND PRIMARY CARE SERVICES


### E numa consulta simples de SQL retorno os prescribers adicionados no ultimo mês
- Lembrando que por se tratar apenas de uma amostra dos dados, temos o retorno apenas do **total dessa amostra**
- Aqui conto e seleciono os valor com **having count(PRACTICE_NAME) = 1** , pois o nome só se repetira 2x se estiver contido nos dois meses.
- **where YEAR_MONTH = 202208** não se torna necessario nesse caso, pois não tivemos prescribers no mes de julho que foram retirados no mês de agosto, pelo menos dentro dessa amostra.

In [46]:
df_add_no_mes_sql = spark.sql("""
SELECT PRACTICE_NAME as Prescribers, 
       count(PRACTICE_NAME) as Novo
FROM query_amostra
--where YEAR_MONTH = 202207 -- Deixei comentado por poder ficar redundante a informação
group by PRACTICE_NAME
having count(PRACTICE_NAME) = 1
order by PRACTICE_NAME;
""")


#### Por ter encontrado problema como o 'case' na integração SQL > spark, optei por fazer apenas pelo spark.
- Aqui quis substituir a na linha o valor '1' por 'Sim'

In [47]:
df_add_no_mes_sql = df_add_no_mes_sql.withColumn("Novo", when(col("Novo") == "1","Sim")
      .otherwise("Nao"))

In [48]:
df_add_no_mes_sql.show()

+--------------------+----+
|         Prescribers|Novo|
+--------------------+----+
|0-19 PUBLIC HEALT...| Sim|
|7 DAY ACCESS LITT...| Sim|
|7 DAY ACCESS PHOE...| Sim|
|             A&E GPS| Sim|
|ABOUT HEALTH LN D...| Sim|
|ACTIVE RECOVERY (...| Sim|
|ACUTE ADULT MENTA...| Sim|
|ACUTE AND URGENT ...| Sim|
|ACUTE GP REFERRAL...| Sim|
|ACUTE MEDICAL UNI...| Sim|
| ACUTE MEDICINE DIR.| Sim|
| ACUTE MENTAL HEALTH| Sim|
|       ADEL BECK SCH| Sim|
|ADULT ADHD CHESHI...| Sim|
|   ADULT ADHD HALTON| Sim|
|ADULT ADHD KNOWSL...| Sim|
|ADULT LEARNING DI...| Sim|
|AIREBOROUGH COMMU...| Sim|
|    ALDINE HOUSE SCH| Sim|
| AMBER VALLEY HEALTH| Sim|
+--------------------+----+
only showing top 20 rows



### Queria deixar mais apresentavel o resultado apenas com SQL, mas infelizmente o pyspark entrou em conflito por conta do 'case'
- Testei no **Google BigQuery** e funcionou normalmente.

In [49]:
'''df_add_no_mes_sql = spark.sql("""
SELECT PRACTICE_NAME as Prescribers,
       CASE
       WHEN count(PRACTICE_NAME) = 1 THEN 'sim'
       else 'Nao'
       end as Novo,
FROM query_amostra
group by Prescribers
having count(PRACTICE_NAME) = 1
order by 1;
""")'''

'df_add_no_mes_sql = spark.sql("""\nSELECT PRACTICE_NAME as Prescribers,\n       CASE\n       WHEN count(PRACTICE_NAME) = 1 THEN \'sim\'\n       else \'Nao\'\n       end as Novo,\nFROM query_amostra\ngroup by Prescribers\nhaving count(PRACTICE_NAME) = 1\norder by 1;\n""")'

### Exportando o resultado para um csv e conferindo a disponibilidade / armazenamento

In [50]:
df_add_no_mes_sql.toPandas().to_csv('Warehouse/Mes_06/df_add_no_mes_sql.csv', index=False)
df_add_no_mes_sql = pd.read_csv(r'Warehouse/Mes_06/df_add_no_mes_sql.csv')
df_add_no_mes_sql

Unnamed: 0,Prescribers,Novo
0,0-19 PUBLIC HEALTH SERVICE HARTLEPOOL,Sim
1,7 DAY ACCESS LITTLEBOROUGH HUB,Sim
2,7 DAY ACCESS PHOENIX HUB,Sim
3,A&E GPS,Sim
4,ABOUT HEALTH LN DERMATOLOGY,Sim
...,...,...
510,WORKINGTON HEALTH LIMITED,Sim
511,WWL CN WEST LANCS,Sim
512,WYRE FOREST WARD,Sim
513,YORK ADULT COMMUNITY NURSING,Sim


---
# Aqui finalizo o terceiro e ultimo notebook com meu projeto do desafio!
#### Obrigado!