In [1]:
#!/usr/bin/env python
# coding: utf-8


YYYY = '2018'
MM = ['01']
DD = ['07']
MIN = ['20']
HH = ['06']

target ='/media/liwei/Data/AHI_AC_RESULT/'
SZA_path = '/media/liwei/Data/Solar_zenith_angle/'
SAZ_path = '/media/liwei/Data/Solar_azimuth_angle/'

VZA_path = '/media/liwei/Data/AHI_Angle/AHI_VZA_10.dat'
VAA_path = '/media/liwei/Data/AHI_Angle/AHI_VAA_10.dat'
LUT_path = '/media/liwei/Data/LUT/'
CAMS_path = '/media/liwei/Data/CAMS/'

res = 0.01

site = 27,96.5

u_lat,d_lat = 27+0.06,27-0.06
l_lon,r_lon = 96.5-0.06,96.5+0.06

In [18]:
import numpy as np
from Py6S import *
import time as T
from joblib import Parallel, delayed
from scipy.interpolate import griddata,interpn,RegularGridInterpolator
import math
import os
import datetime
import cv2
from ftplib import FTP
import rioxarray
import xarray as xr
import multiprocessing
import paramiko
from scp import SCPClient
import subprocess

   
class H8_data:

    def __init__(self , account , pw , band , band_number , date):
        self.account = account
        self.pw = pw
        self.band = band
        self.band_number = band_number
        self.date = date
    
    def get_path(self):
        return '/mnt/nas01G/geo01/H8AHI/download/org/192.168.1.5/gridded/FD/V20190123/' + self.date[0:6] + '/' + self.band.upper() + '/'

    def get_filename(self):
        return self.date + "." + self.band + "." + self.band_number + ".fld.geoss.bz2"
    
    def DN2TBB(self,data):
        LUT=np.loadtxt('/media/liwei/Data/count2tbb_v102/' + self.band + "." + self.band_number)
        return LUT[data,1]
    
    def file_path(self):
        return self.get_path() + self.get_filename() 
                 
    def download_H8data(self):
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(hostname='10.4.200.105', port=22, username=self.account, password=self.pw)
        scp = SCPClient(client.get_transport())
        sftp = client.open_sftp()

        try :
            sftp.stat(self.file_path())

        except FileNotFoundError:
            print("File Not Found")
            return 'No data'

        else:
            scp.get(self.file_path(), folder_original+'/')
            p = subprocess.Popen('lbzip2 -d {}{}'.format(folder_original+'/',self.file_path()[-33:]),shell=True)
            p.communicate()
            print ('Himawari8/AHI data Processed Finish')
            return folder_original + '/' + self.get_filename()[:-4]
            
    def read_H8data(self):
        H8_file_path = self.download_H8data()
        if self.band == "vis":
            sr = 12000
        elif self.band == "ext":
            sr = 24000
        else:
            sr = 6000
        if H8_file_path != 'No data':
            with open(H8_file_path,'rb') as fp:
                data = np.frombuffer(fp.read(),dtype='>u2').reshape(sr,sr)
                data = self.DN2TBB(data)
                data = data/100

            print("data reading finish")
            return data[row_u_AHI:row_u_AHI + row_AHI , col_l_AHI:col_l_AHI + col_AHI]
        else:
            return 'No data'
        

class JAXA_data:
    def __init__(self,account,YYYY,MM,DD,HH):
        self.account = account
        self.YYYY = YYYY
        self.MM = MM
        self.DD = DD
        self.HH = HH
        
    def download_JAXA(self):
        ftp_addr = 'ftp.ptree.jaxa.jp'
        f=FTP(ftp_addr)
        f.login(self.account,'SP+wari8')
        remote_filepath = '/pub/model/ARP/MS/bet/' + self.YYYY + self.MM + '/' + self.DD + '/'
        f.cwd(remote_filepath)
        list=f.nlst()
        bufsize=1024
        for name in list:
            if name[13:17]== self.HH + '00':
                data=open(folder_original + name ,'wb')
                filename='RETR '+ name
                f.retrbinary(filename,data.write,bufsize)
                f.quit()
                return folder_original + name
    
    def read_JAXA(self):
        AOT_path = self.download_JAXA()
        ds = xr.open_dataset(AOT_path)
        
        aot550=ds['od550aer']
        AOT550=aot550.interp(lon=lon_y,lat=lat_x,method="nearest")
        AOT550=AOT550.values
        AOT550[AOT550>=max(AOT)] = max(AOT)-(1/10000)
        AOT550[AOT550<=min(AOT)] = min(AOT)+(1/10000)
    
        
        bc = ds['od550bc'].interp(lon=lon_y,lat=lat_x,method="nearest")
        so4 = ds['od550so4'].interp(lon=lon_y,lat=lat_x,method="nearest")
        oa = ds['od550oa'].interp(lon=lon_y,lat=lat_x,method="nearest")
        dust = ds['od550dust'].interp(lon=lon_y,lat=lat_x,method="nearest")
        ss = ds['od550ss'].interp(lon=lon_y,lat=lat_x,method="nearest")
        
        
        DL_6S = np.array(dust).reshape(row_AHI,col_AHI)
        SL_6S = np.array(so4).reshape(row_AHI,col_AHI) + np.array(bc).reshape(row_AHI,col_AHI)
        OC_6S = np.array(ss).reshape(row_AHI,col_AHI)
        WS_6S = np.array(oa).reshape(row_AHI,col_AHI)

        Total = DL_6S + SL_6S + OC_6S + WS_6S

        precent_DL_6S = DL_6S / Total
        precent_SL_6S = SL_6S / Total
        precent_OC_6S = OC_6S / Total
        precent_WS_6S = WS_6S / Total
        P = np.dstack((precent_DL_6S,precent_WS_6S,precent_OC_6S,precent_SL_6S))
        
        Aerosol_type = np.full((row_AHI,col_AHI),np.nan)
        
        for i in range(row_AHI):
            for j in range(col_AHI):
                if max(P[i,j,:]) == precent_OC_6S[i,j]:
                    Aerosol_type[i,j] = 1
                else:
                    Aerosol_type[i,j] = 0
                    
        return np.array(AOT550).reshape(row_AHI,col_AHI),Aerosol_type

class CAMS_data:
    def __init__(self,MM,HH):
        self.MM = MM
        self.HH = HH
        
    def ATMO_time(self):
        if int(self.HH)%3==0:
            return HH
        elif (int(self.HH)-1)%3==0:
            return str(int(HH)-1).zfill(2)
        elif int(self.HH)==23:
            return str(21).zfill(2)
        else:
            return str(int(self.HH)+1).zfill(2)
        
    def read_CAMS(self):

        ds_oz_wv = xr.open_dataset(CAMS_path + YYYY + self.MM + self.ATMO_time()[0] + '.nc') 
        oz=ds_oz_wv['gtco3'][int(DD[d])-1,:,:]
        OZ=oz.interp(longitude=lon_y,latitude=lat_x,method="nearest")
        OZ=OZ.values
        wv=ds_oz_wv['tcwv'][int(DD[d])-1,:,:]
        WV=wv.interp(longitude=lon_y,latitude=lat_x,method="nearest")
        WV=WV.values
        
#         Atmosphere data Unit conversion
        WV = WV/10
        OZ = OZ*46.6975764


#         Processing water vapor and ozone max and min
        OZ[OZ>=max(ozone)] = max(ozone)-(1/10000)
        OZ[OZ<=min(ozone)] = min(ozone)+(1/10000)
        WV[WV>=max(water)] = max(water)-(1/10000)
        WV[WV<=min(water)] = min(water)+(1/10000)
        return np.array(OZ).reshape(row_AHI,col_AHI),np.array(WV).reshape(row_AHI,col_AHI)

def griddata_inter(X1,X2,X3,point,xi,yi,Aero,Water,Aot,Raa):
    X1_new_inter=[]
    X2_new_inter=[]
    X3_new_inter=[]

    X1_inter=X1[Aero,Water,Aot,:,:,Raa].reshape(17*17,1)
    X2_inter=X2[Aero,Water,Aot,:,:,Raa].reshape(17*17,1)
    X3_inter=X3[Aero,Water,Aot,:,:,Raa].reshape(17*17,1)

    X1_new = griddata(point, X1_inter, (xi, yi), method='cubic')
    X2_new = griddata(point, X2_inter, (xi, yi), method='cubic')
    X3_new = griddata(point, X3_inter, (xi, yi), method='nearest')

    X1_new_inter.append(X1_new)
    X2_new_inter.append(X2_new)
    X3_new_inter.append(X3_new)

    del X1_inter,X2_inter,X3_inter,X1_new,X2_new,X3_new  
    return X1_new_inter,X2_new_inter,X3_new_inter

class LUT_interpolation:
    def __init__(self,LUT_path):
        self.LUT_path = LUT_path
    
    def LUT_interpolation(self):
        Continental_X1 = np.loadtxt(self.LUT_path + "Continental_01_band4.csv",delimiter=",").reshape(8,12,17,17,19)
        Continental_X2 = np.loadtxt(self.LUT_path + "Continental_02_band4.csv",delimiter=",").reshape(8,12,17,17,19)
        Continental_X3 = np.loadtxt(self.LUT_path + "Continental_03_band4.csv",delimiter=",").reshape(8,12,17,17,19)
        
        Maritime_X1 = np.loadtxt(self.LUT_path + "Maritime_01_band4.csv",delimiter=",").reshape(8,12,17,17,19)
        Maritime_X2 = np.loadtxt(self.LUT_path + "Maritime_02_band4.csv",delimiter=",").reshape(8,12,17,17,19)
        Maritime_X3 = np.loadtxt(self.LUT_path + "Maritime_03_band4.csv",delimiter=",").reshape(8,12,17,17,19)

        X1 = np.stack((Continental_X1,Maritime_X1))
        X2 = np.stack((Continental_X2,Maritime_X2))
        X3 = np.stack((Continental_X3,Maritime_X3))
        
        del Continental_X1,Maritime_X1,Continental_X2,Maritime_X2,Continental_X3,Maritime_X3

        
        point = np.array(np.meshgrid(sza, vza)).T.reshape(-1,2)
        xi,yi=np.ogrid[0:80:161j, 0:80:161j]
        output = Parallel(n_jobs=-1)(delayed(griddata_inter)(X1,X2,X3,point,xi,yi,Aero,Water,Aot,Raa)\
                                         for Aero in range(len(aero_type)) 
                                         for Water in range(len(water))                              
                                         for Aot in range(len(AOT))                              
                                         for Raa in range(len(raa)))

        X1_new_inter_reshape=np.array(output)[:,0].reshape(2,8,12,19,161,161)
        X2_new_inter_reshape=np.array(output)[:,1].reshape(2,8,12,19,161,161)
        X3_new_inter_reshape=np.array(output)[:,2].reshape(2,8,12,19,161,161)

        del X1,X2,X3,output

        sza_new = np.linspace(0,80,161)
        vza_new = np.linspace(0,80,161)

        fn1 = RegularGridInterpolator((aero_type,water,AOT,raa,sza_new,vza_new),X1_new_inter_reshape,bounds_error=False,fill_value=np.nan)
        fn2 = RegularGridInterpolator((aero_type,water,AOT,raa,sza_new,vza_new),X2_new_inter_reshape,bounds_error=False,fill_value=np.nan)
        fn3 = RegularGridInterpolator((aero_type,water,AOT,raa,sza_new,vza_new),X3_new_inter_reshape,bounds_error=False,fill_value=np.nan)

        return fn1,fn2,fn3
        
class AHI_angle:
    def __init__(self,date,angle):
        self.date = date
        self.angle = angle
        
    def download_AHI_angle(self):

        date1 = self.date[0:6]
        date2 = self.date[0:8]
        AHI_date = self.date
        
        sza_file_name = '{}.sun.zth.fld.4km.bin.bz2'.format(AHI_date)
        saa_file_name = '{}.sun.azm.fld.4km.bin.bz2'.format(AHI_date)


        if self.angle == 'sza':
            if os.path.exists(folder_original + sza_file_name[:-4]):
                return sza_file_name[:-4]

            else:
                ftp = FTP()
                ftp.connect('hmwr829gr.cr.chiba-u.ac.jp', 21)
                ftp.login()
                path = '/gridded/FD/V20190123/{}/4KM/{}/'.format(date1,date2)
                ftp.cwd(path)
                remote_list=ftp.nlst()
                bufsize=1024*1024
                if sza_file_name in remote_list:
                    data = open(folder_original + sza_file_name,'wb')
                    filename='RETR '+ sza_file_name
                    ftp.retrbinary(filename,data.write,bufsize)
                    ftp.quit()

                    return sza_file_name[:-4]
                else:
    #                 print('AHI NO DATA')
                    return None

        elif self.angle == 'saa':
            if os.path.exists(folder_original + saa_file_name[:-4]):
                return saa_file_name[:-4]

            else:
                ftp = FTP()
                ftp.connect('hmwr829gr.cr.chiba-u.ac.jp', 21)
                ftp.login()
                path = '/gridded/FD/V20190123/{}/4KM/{}/'.format(date1,date2)
                ftp.cwd(path)
                remote_list=ftp.nlst()
                bufsize=1024*1024
                if saa_file_name in remote_list:
                    data = open(folder_original + saa_file_name,'wb')
                    filename='RETR '+ saa_file_name
                    ftp.retrbinary(filename,data.write,bufsize)
                    ftp.quit()
                    return saa_file_name[:-4]
                else:
    #                 print('AHI NO DATA')
                    return 'No data'
    
    def Decompression(self,fn):
        if os.path.exists('{}.bz2'.format(folder_original + fn)):
            p = subprocess.Popen('lbzip2 -d {}.bz2'.format(folder_original + fn),shell=True)
            p.communicate()
        
        
    def read_AHI_solar_angle(self):
        return_code = self.download_AHI_angle()
        if return_code != 'No data':
            self.Decompression(return_code)
            with open(folder_original + return_code, 'rb' ) as fp:
                AHI_angle = np.frombuffer(fp.read(),dtype='>f4').reshape(3000,3000)[row_u_4KM:row_u_4KM + row_4KM , col_l_4KM:col_l_4KM + col_4KM]
            AHI_angle=cv2.resize(np.array(AHI_angle,dtype='float64'),(row_AHI,col_AHI),interpolation=cv2.INTER_NEAREST)
            return AHI_angle
        else:
            return 'No data'
        

def H8_Process(ACCOUNT,PW,Band,Band_number,Date):
    data = H8_data(ACCOUNT,PW,Band,Band_number,Date).read_H8data()
    return data

    
def JAXA_Process(ACCOUNT,YYYY,MM,DD,HH):
    JAXA = JAXA_data(ACCOUNT,YYYY,MM,DD,HH)
    AOT550,Aerosol_type = JAXA.read_JAXA()
    return AOT550,Aerosol_type

def AHI_angle_Process(date,angle):
    Angle = AHI_angle(date,angle).read_AHI_solar_angle()
    return Angle

def remove_original_file(path):
    subprocess.Popen('rm -rf {}'.format(path))
    
    
def mkdir(path):
    folder = os.path.exists(path) 
    if not folder:
        os.makedirs(path)

def calculate_6s_band4(i):
    Aero_input = Aerosol_type[i,:]
    WV_input = WV[i,:]
    AOT550_input = AOT550[i,:]
    RAA_input = RAA[i,:]
    SZA_input = AHI_SZA[i,:]
    view_zM_input = AHI_VZA[i,:]
    xi = np.array([Aero_input,WV_input,AOT550_input,RAA_input,SZA_input,view_zM_input])
    xi = xi.T
    xa = fn1(xi)
    xb = fn2(xi)
    xc = fn3(xi)
    y = xa * AHI_data[i,:]-xb
    SR = y/(1+xc*y)
    return SR

sza = np.linspace(0,80,17)
vza = np.linspace(0,80,17)
water = np.linspace(0,7,8)
ozone = np.linspace(0.2,0.4,5)
AOT = np.array([0.01,0.05,0.1,0.15,0.2,0.3,0.4,0.6,0.8,1.0,1.5,2.0])
raa = np.linspace(0,180,19)
aero_type = np.array([0,1])

row_AHI = round((u_lat - d_lat) / res)
col_AHI = round((r_lon - l_lon) / res)

row_u_AHI = round((60 - u_lat)/res)
col_l_AHI = round((l_lon - 85)/res)


lat_x = np.linspace(u_lat,d_lat + res,row_AHI)
lon_y = np.linspace(l_lon,r_lon - res,col_AHI)


row_4KM = round((u_lat - d_lat) / 0.04)
col_4KM = round((r_lon - l_lon) / 0.04)

row_u_4KM = round((60 - u_lat)/0.04)
col_l_4KM = round((l_lon - 85)/0.04)

fn1,fn2,fn3 = LUT_interpolation(LUT_path).LUT_interpolation()

with open(VZA_path,'rb') as fp:
    AHI_VZA = np.frombuffer(fp.read()).reshape(12000,12000)[row_u_AHI:row_u_AHI + row_AHI , col_l_AHI:col_l_AHI + col_AHI]
with open(VAA_path,'rb') as fp:
    AHI_VAA = np.frombuffer(fp.read()).reshape(12000,12000)[row_u_AHI:row_u_AHI + row_AHI , col_l_AHI:col_l_AHI + col_AHI]


In [19]:
for m in range(len(MM)):
    for d in range(len(DD)):
        for h in range(len(HH)):
            for mi in range(len(MIN)):
                start_time = T.time()
                date = YYYY+MM[m]+DD[d]+HH[h]+MIN[mi]
                time = HH[h] + MIN[mi]

                print("start processing {}".format(date))
                # make dir
                folder_original = target+date+'_original/'
                folder_AC = target+date+'_AC/'
                mkdir(folder_original)
                mkdir(folder_AC)
                # Download AHI
                AHI_data = H8_Process('liwei','liwei000','vis','03',date)
                if AHI_data == 'No data':
                    continue
        
                    # Solar angle
                AHI_SZA = AHI_angle_Process(date,'sza')
                AHI_SAA = AHI_angle_Process(date,'saa')
                
                if AHI_SZA == 'No data' or AHI_SAA == 'No data':
                    continue
                RAA = abs(AHI_SAA - AHI_VAA)
                RAA[RAA>180]=360-RAA[RAA>180]
                
                AOT550,Aerosol_type = JAXA_Process('liwei1997_chiba-u.jp',YYYY,MM[m],DD[d],HH[h])
                OZ,WV = CAMS_data(MM[m],HH[h]).read_CAMS()
                
                SR = Parallel(n_jobs=-1)(delayed(calculate_6s_band4)(i) for i in range(row_AHI))
                # Save file and remove download input data
                SR=np.array(SR).reshape(row_AHI,col_AHI)
                SR_file=open(folder_AC+'/'+date+'_b03.dat','wb')
                SR.astype('f4').tofile(SR_file)
                SR_file.close()
#                 remove_original_file(folder_original)
                end_time=T.time()
                TIME=end_time-start_time
                print('time: {:.1f} secs, {:.1f} mins,{:.1f} hours'.format(TIME,TIME/60,TIME/3600))
                print("delete file finish")
                    
                    
                    

start processing 201801070620


lbzip2: skipping "/media/liwei/Data/AHI_AC_RESULT/201801070620_original//201801070620.vis.03.fld.geoss.bz2": open("/media/liwei/Data/AHI_AC_RESULT/201801070620_original//201801070620.vis.03.fld.geoss"): File exists


Himawari8/AHI data Processed Finish
data reading finish


  if AHI_data == 'No data':
  if AHI_SZA == 'No data' or AHI_SAA == 'No data':


time: 4.2 secs, 0.1 mins,0.0 hours
delete file finish
