Skip to content
This repository
branch: master
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 302 lines (230 sloc) 9.196 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301

# create importation function
# to use different 'NULL AS <something>' options for the actual command that imports
# lines into monetdb: COPY <stuff> INTO <tablename> ...
sql.copy.into <-
function( nullas , num.lines , tablename , tf2 , connection , delimiters ){

# import the data into the database
sql.update <- paste0( "copy " , num.lines , " offset 2 records into " , tablename , " from '" , tf2 , "' using delimiters " , delimiters , nullas )
dbSendUpdate( connection , sql.update )

# return true when it's completed
TRUE
}


# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# differences from the SAScii package's read.SAScii() --
# um well a whole lot faster
# no RAM issues
# decimal division must be TRUE/FALSE (as opposed to NULL - the user must decide)
# must read in the entire table
# requires RMonetDB and a few other packages
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

read.SAScii.monetdb <-
function(
# differences between parameters for read.SAScii() (from the R SAScii package)
# and read.SAScii.monetdb() documented here --
fn ,
sas_ri ,
beginline = 1 ,
zipped = F ,
# n = -1 , # no n parameter available for this - you must read in the entire table!
lrecl = NULL ,
skip.decimal.division = FALSE , # skipping decimal division defaults to FALSE for this function!
tl = F , # convert all column names to lowercase?
tablename ,
overwrite = FALSE , # overwrite existing table?
connection ,
tf.path = NULL , # do temporary files need to be stored in a specific folder?
# this option is useful for keeping protected data off of random temporary folders on your computer--
# specifying this option creates the temporary file inside the folder specified
delimiters = "'\t'" , # delimiters for the monetdb COPY INTO command
sleep.between.col.updates = 0

) {

# before anything else, create the temporary files needed for this function to run
# if the user doesn't specify that the temporary files get stored in a temporary directory
# just put them anywhere..
if ( is.null( tf.path ) ){
tf <- tempfile()
td <- tempdir()
tf2 <- tempfile()
tf3 <- tempfile()
} else {
# otherwise, put them in the protected folder
tf.path <- normalizePath( tf.path )
td <- tf.path
tf <- paste0( tf.path , "/" , tablename , "1" )
tf2 <- paste0( tf.path , "/" , tablename , "2" )
tf3 <- paste0( tf.path , "/" , tablename , "3" )
}

file.create( tf , tf2 , tf3 )


# scientific notation contains a decimal point when converted to a character string..
# so store the user's current value and get rid of it.
user.defined.scipen <- getOption( 'scipen' )

# set scientific notation to something impossibly high. Inf doesn't work.
options( scipen = 1000000 )


# read.SAScii.monetdb depends on the SAScii package and the descr package
# to install these packages, use the line:
# install.packages( c( 'SAScii' , 'descr' , 'downloader' ) )
library(SAScii)
library(descr)
library(downloader)

if ( !exists( "download.cache" ) ){
# load the download.cache and related functions
# to prevent re-downloading of files once they've been downloaded.
source_url(
"https://raw.github.com/ajdamico/usgsd/master/Download%20Cache/download%20cache.R" ,
prompt = FALSE ,
echo = FALSE
)
}



x <- parse.SAScii( sas_ri , beginline , lrecl )

if( tl ) x$varname <- tolower( x$varname )

#only the width field should include negatives
y <- x[ !is.na( x[ , 'varname' ] ) , ]


# deal with gaps in the data frame #
num.gaps <- nrow( x ) - nrow( y )

# if there are any gaps..
if ( num.gaps > 0 ){

# read them in as simple character strings
x[ is.na( x[ , 'varname' ] ) , 'char' ] <- TRUE
x[ is.na( x[ , 'varname' ] ) , 'divisor' ] <- 1

# invert their widths
x[ is.na( x[ , 'varname' ] ) , 'width' ] <- abs( x[ is.na( x[ , 'varname' ] ) , 'width' ] )

# name them toss_1 thru toss_###
x[ is.na( x[ , 'varname' ] ) , 'varname' ] <- paste( 'toss' , 1:num.gaps , sep = "_" )

# and re-create y
y <- x
}

#if the ASCII file is stored in an archive, unpack it to a temporary file and run that through read.fwf instead.
if ( zipped ){
#download the CPS repwgts zipped file
download.cache( fn , tf , mode = "wb" )
#unzip the file's contents and store the file name within the temporary directory
fn <- unzip( tf , exdir = td , overwrite = T )

on.exit( file.remove( tf ) )
}



# if the overwrite flag is TRUE, then check if the table is in the database..
if ( overwrite ){
# and if it is, remove it.
if ( tablename %in% dbListTables( connection ) ) dbRemoveTable( connection , tablename )

# if the overwrite flag is false
# but the table exists in the database..
} else {
if ( tablename %in% dbListTables( connection ) ) stop( "table with this name already in database" )
}

if ( sum( grepl( 'sample' , tolower( y$varname ) ) ) > 0 ){
print( 'warning: variable named sample not allowed in monetdb' )
print( 'changing column name to sample_' )
y$varname <- gsub( 'sample' , 'sample_' , y$varname )
}

fields <- y$varname

colTypes <- ifelse( !y[ , 'char' ] , 'DOUBLE PRECISION' , 'VARCHAR(255)' )


colDecl <- paste( fields , colTypes )

sql.create <-
sprintf(
paste(
"CREATE TABLE" ,
tablename ,
"(%s)"
) ,
paste(
colDecl ,
collapse = ", "
)
)

# starts and ends
w <- abs ( x$width )
s <- 1
e <- w[ 1 ]
for ( i in 2:length( w ) ) {
s[ i ] <- s[ i - 1 ] + w[ i - 1 ]
e[ i ] <- e[ i - 1 ] + w[ i ]
}

# create another file connection to the temporary file to store the fwf2csv output..
zz <- file( tf3 , open = 'wt' )
sink( zz , type = 'message' )

# convert the fwf to a csv
# verbose = TRUE prints a message, which has to be captured.
fwf2csv( fn , tf2 , names = x$varname , begin = s , end = e , verbose = TRUE )
on.exit( { file.remove( tf2 ) } )

# stop storing the output
sink( type = "message" )
# unlink( tf3 )
on.exit( { file.remove( tf3 ) } )

# read the contents of that message into a character string
zzz <- readLines( tf3 )

# read it up to the first space..
last.char <- which( strsplit( zzz , '')[[1]]==' ')

# ..and that's the number of lines in the file
num.lines <- substr( zzz , 1 , last.char - 1 )

# in speed tests, adding the exact number of lines in the file was much faster
# than setting a very high number and letting it finish..

# create the table in the database
dbSendUpdate( connection , sql.create )

##############################
# begin importation attempts #

# notice the differences in the NULL AS <stuff> for the five different attempts.
# monetdb importation is finnicky, so attempt a bunch of different COPY INTO tries
# using the sql.copy.into() function defined above

# capture an error (without breaking)
te <- try( sql.copy.into( " NULL AS ''" , num.lines , tablename , tf2 , connection , delimiters ) , silent = TRUE )

# try another delimiter statement
if ( class( te ) == "try-error" ){
cat( 'attempt #1 broke, trying method #2' , "\r" )
te <- try( sql.copy.into( " NULL AS ' '" , num.lines , tablename , tf2 , connection , delimiters ) , silent = TRUE )
}

# try another delimiter statement
if ( class( te ) == "try-error" ){
cat( 'attempt #2 broke, trying method #3' , "\r" )
te <- try( sql.copy.into( "" , num.lines , tablename , tf2 , connection , delimiters ) , silent = TRUE )
}

# try another delimiter statement
if ( class( te ) == "try-error" ){
cat( 'attempt #3 broke, trying method #4' , "\r" )
te <- try( sql.copy.into( paste0( " NULL AS '" , '""' , "'" ) , num.lines , tablename , tf2 , connection , delimiters ) , silent = TRUE )
}

if ( class( te ) == "try-error" ){
cat( 'attempt #4 broke, trying method #5' , "\r" )
# this time without error-handling.
sql.copy.into( " NULL AS '' ' '" , num.lines , tablename , tf2 , connection , delimiters )
}

# end importation attempts #
############################


# loop through all columns to:
# convert to numeric where necessary
# divide by the divisor whenever necessary
for ( l in seq( nrow(y) ) ){

if (
( y[ l , "divisor" ] != 1 ) &
!( y[ l , "char" ] )
) {

sql <-
paste(
"UPDATE" ,
tablename ,
"SET" ,
y[ l , 'varname' ] ,
"=" ,
y[ l , 'varname' ] ,
"*" ,
y[ l , "divisor" ]
)

if ( !skip.decimal.division ){
dbSendUpdate( connection , sql )

# give the MonetDB mserver.exe a certain number of seconds to process each column
Sys.sleep( sleep.between.col.updates )
}

}

cat( " current progress: " , l , "of" , nrow( y ) , "columns processed. " , "\r" )


}

# eliminate gap variables.. loop through every gap
if ( num.gaps > 0 ){
for ( i in seq( num.gaps ) ) {

# create a SQL query to drop these columns
sql.drop <- paste0( "ALTER TABLE " , tablename , " DROP toss_" , i )

# and drop them!
dbSendUpdate( connection , sql.drop )
}
}

# reset scientific notation length
options( scipen = user.defined.scipen )

TRUE
}
Something went wrong with that request. Please try again.